[GitHub] [flink] kl0u commented on pull request #14028: [FLINK-20020][client] Make UnsuccessfulExecutionException part of the JobClient.getJobExecutionResult() contract
kl0u commented on pull request #14028: URL: https://github.com/apache/flink/pull/14028#issuecomment-725908109 Also why not putting the logic directly in the `JobResult.toJobExecutionResult()`, instead of keeping it separately to each client? This will lead to more robust code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SteNicholas commented on pull request #14028: [FLINK-20020][client] Make UnsuccessfulExecutionException part of the JobClient.getJobExecutionResult() contract
SteNicholas commented on pull request #14028: URL: https://github.com/apache/flink/pull/14028#issuecomment-725907605 > Hi @SteNicholas , I reviewing your PR now. One thing to add is some tests (for each client) that verify that all clients throw the expected type of exception. This is to make sure that no one breaks the contract in the future. Also this can become part of the javadoc of the `JobClient.getJobExecutionResult()`. I forgot to add `JobExecutionResultException` contract to the javadoc of `JobClient.getJobExecutionResult()`. And about the tests, I thought that current test cases include the tests that verify that all clients throw the expected type of exception. But it's better to add tests for each client extrally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20098) Don't add flink-connector-files to flink-dist
[ https://issues.apache.org/jira/browse/FLINK-20098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-20098: - Issue Type: Improvement (was: Bug) > Don't add flink-connector-files to flink-dist > - > > Key: FLINK-20098 > URL: https://issues.apache.org/jira/browse/FLINK-20098 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.12.0 > > > We currently add both {{flink-connector-files}} and {{flink-connector-base}} > to {{flink-dist}}. > This implies, that users should use the dependency like this: > {code} > > org.apache.flink > flink-connector-files > ${project.version} > provided > > {code} > which differs from other connectors where users don't need to specify > {{provided}}. > Also, {{flink-connector-files}} has {{flink-connector-base}} as a provided > dependency, which means that examples that use this dependency will not run > out-of-box in IntelliJ because transitive provided dependencies will not be > considered. > I propose to just remove the dependencies from {{flink-dist}} and let users > use the File Connector like any other connector. > I believe the initial motivation for "providing" the File Connector in > {{flink-dist}} was to allow us to use the File Connector under the hood in > methods such as {{StreamExecutionEnvironment.readFile(...)}}. We could decide > to deprecate and remove those methods or re-add the File Connector as an > explicit (non-provided) dependency again in the future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20013) BoundedBlockingSubpartition may leak network buffer if task is failed or canceled
[ https://issues.apache.org/jira/browse/FLINK-20013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230407#comment-17230407 ] Arvid Heise commented on FLINK-20013: - Could you please create PRs for the backports, such that we can see with CI that no side-effects are happening? > BoundedBlockingSubpartition may leak network buffer if task is failed or > canceled > - > > Key: FLINK-20013 > URL: https://issues.apache.org/jira/browse/FLINK-20013 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > BoundedBlockingSubpartition may leak network buffer if task is failed or > canceled. We need to recycle the current BufferConsumer when task is failed > or canceled. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-19872) Fix CSV format is unable to parse millisecond for TIME type
[ https://issues.apache.org/jira/browse/FLINK-19872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-19872. --- Resolution: Fixed Fixed in master (1.12.0): e86b40909dcd4a0f53fa900989cf2655ff8f6c19 > Fix CSV format is unable to parse millisecond for TIME type > --- > > Key: FLINK-19872 > URL: https://issues.apache.org/jira/browse/FLINK-19872 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Reporter: Fangliang Liu >Assignee: Fangliang Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong commented on pull request #13834: [FLINK-19872][csv] Fix CSV format is unable to parse millisecond for TIME type
wuchong commented on pull request #13834: URL: https://github.com/apache/flink/pull/13834#issuecomment-725904101 Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #13834: [FLINK-19872][csv] Fix CSV format is unable to parse millisecond for TIME type
wuchong merged pull request #13834: URL: https://github.com/apache/flink/pull/13834 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20101) Fix the wrong documentation of FROM_UNIXTIME function
[ https://issues.apache.org/jira/browse/FLINK-20101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-20101: Description: The return value should be '1970-01-01 00:00:44' in UTC time zone. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#temporal-functions !image-2020-11-12-15-43-16-755.png|thumbnail! was: The return value should be '1970-01-01 00:00:44' in UTC time zone. !image-2020-11-12-15-43-16-755.png|thumbnail! > Fix the wrong documentation of FROM_UNIXTIME function > - > > Key: FLINK-20101 > URL: https://issues.apache.org/jira/browse/FLINK-20101 > Project: Flink > Issue Type: Bug > Components: Documentation, Table SQL / API >Reporter: Jark Wu >Priority: Major > Fix For: 1.12.0, 1.11.3 > > Attachments: image-2020-11-12-15-43-16-755.png > > > The return value should be '1970-01-01 00:00:44' in UTC time zone. > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#temporal-functions > !image-2020-11-12-15-43-16-755.png|thumbnail! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #14043: [FLINK-20046][python] Fix the unstable test case 'StreamTableAggregateTests.test_map_view_iterate'.
flinkbot commented on pull request #14043: URL: https://github.com/apache/flink/pull/14043#issuecomment-725903766 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit f1050a9fbd8c9df2d58ab8915c4fece3eb285d77 (Thu Nov 12 07:49:34 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19585) UnalignedCheckpointCompatibilityITCase.test:97->runAndTakeSavepoint: "Not all required tasks are currently running."
[ https://issues.apache.org/jira/browse/FLINK-19585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230405#comment-17230405 ] Arvid Heise commented on FLINK-19585: - Backport will happen after [~trohrmann] fixed FLINK-20065 as I currently have no way of reliably testing it. > UnalignedCheckpointCompatibilityITCase.test:97->runAndTakeSavepoint: "Not all > required tasks are currently running." > > > Key: FLINK-19585 > URL: https://issues.apache.org/jira/browse/FLINK-19585 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.0, 1.11.2 >Reporter: Robert Metzger >Assignee: Arvid Heise >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7419=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0 > {code} > 2020-10-12T10:27:51.7667213Z [ERROR] Tests run: 4, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 13.146 s <<< FAILURE! - in > org.apache.flink.test.checkpointing.UnalignedCheckpointCompatibilityITCase > 2020-10-12T10:27:51.7675454Z [ERROR] test[type: SAVEPOINT, startAligned: > false](org.apache.flink.test.checkpointing.UnalignedCheckpointCompatibilityITCase) > Time elapsed: 2.168 s <<< ERROR! > 2020-10-12T10:27:51.7676759Z java.util.concurrent.ExecutionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Not all required > tasks are currently running. > 2020-10-12T10:27:51.7686572Z at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2020-10-12T10:27:51.7688239Z at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2020-10-12T10:27:51.7689543Z at > org.apache.flink.test.checkpointing.UnalignedCheckpointCompatibilityITCase.runAndTakeSavepoint(UnalignedCheckpointCompatibilityITCase.java:113) > 2020-10-12T10:27:51.7690681Z at > org.apache.flink.test.checkpointing.UnalignedCheckpointCompatibilityITCase.test(UnalignedCheckpointCompatibilityITCase.java:97) > 2020-10-12T10:27:51.7691513Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-10-12T10:27:51.7692182Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-10-12T10:27:51.7692964Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-10-12T10:27:51.7693655Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-10-12T10:27:51.7694489Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2020-10-12T10:27:51.7707103Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2020-10-12T10:27:51.7729199Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2020-10-12T10:27:51.7730097Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2020-10-12T10:27:51.7730833Z at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-10-12T10:27:51.7731500Z at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2020-10-12T10:27:51.7732086Z at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2020-10-12T10:27:51.7732781Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2020-10-12T10:27:51.7733563Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2020-10-12T10:27:51.7734735Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-10-12T10:27:51.7735400Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-10-12T10:27:51.7736075Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-10-12T10:27:51.7736757Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2020-10-12T10:27:51.7737432Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2020-10-12T10:27:51.7738081Z at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > 2020-10-12T10:27:51.7739008Z at > org.junit.runners.Suite.runChild(Suite.java:128) > 2020-10-12T10:27:51.7739583Z at > org.junit.runners.Suite.runChild(Suite.java:27) > 2020-10-12T10:27:51.7740173Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-10-12T10:27:51.7740800Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-10-12T10:27:51.7741470Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-10-12T10:27:51.7742150Z at >
[GitHub] [flink] twalthr commented on pull request #14035: [FLINK-19340][table-runtime-blink] Support NestedRowData in RowDataSerializer#copy
twalthr commented on pull request #14035: URL: https://github.com/apache/flink/pull/14035#issuecomment-725903180 Thanks @wuchong. I updated the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WeiZhong94 opened a new pull request #14043: [FLINK-20046][python] Fix the unstable test case 'StreamTableAggregateTests.test_map_view_iterate'.
WeiZhong94 opened a new pull request #14043: URL: https://github.com/apache/flink/pull/14043 ## What is the purpose of the change *This pull request fixes the unstable test case 'StreamTableAggregateTests.test_map_view_iterate'.* ## Brief change log - *Fix the unstable test case 'StreamTableAggregateTests.test_map_view_iterate'.* ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20046) StreamTableAggregateTests.test_map_view_iterate is instable
[ https://issues.apache.org/jira/browse/FLINK-20046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-20046: --- Labels: pull-request-available test-stability (was: test-stability) > StreamTableAggregateTests.test_map_view_iterate is instable > --- > > Key: FLINK-20046 > URL: https://issues.apache.org/jira/browse/FLINK-20046 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Wei Zhong >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490 > {code} > 2020-11-07T22:50:57.4180758Z ___ > StreamTableAggregateTests.test_map_view_iterate > 2020-11-07T22:50:57.4181301Z > 2020-11-07T22:50:57.4181965Z self = > testMethod=test_map_view_iterate> > 2020-11-07T22:50:57.4182348Z > 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self): > 2020-11-07T22:50:57.4182812Z test_iterate = > udaf(TestIterateAggregateFunction()) > 2020-11-07T22:50:57.4183320Z > self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1)) > 2020-11-07T22:50:57.4183763Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", > "2") > 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle. > 2020-11-07T22:50:57.4308028Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2") > 2020-11-07T22:50:57.4308945Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", > "2") > 2020-11-07T22:50:57.4309676Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", > "2") > 2020-11-07T22:50:57.4310701Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4311130Z > "python.map-state.iterate-response-batch-size", "2") > 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements( > 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'), > 2020-11-07T22:50:57.4312004Z (1, 'Hi', 'hi'), > 2020-11-07T22:50:57.4312316Z (2, 'hello', 'hello'), > 2020-11-07T22:50:57.4312639Z (3, 'Hi_', 'hi'), > 2020-11-07T22:50:57.4312975Z (3, 'Hi', 'hi'), > 2020-11-07T22:50:57.4313285Z (4, 'hello', 'hello'), > 2020-11-07T22:50:57.4313609Z (5, 'Hi2_', 'hi'), > 2020-11-07T22:50:57.4313908Z (5, 'Hi2', 'hi'), > 2020-11-07T22:50:57.4314238Z (6, 'hello2', 'hello'), > 2020-11-07T22:50:57.4314558Z (7, 'Hi', 'hi'), > 2020-11-07T22:50:57.4315053Z (8, 'hello', 'hello'), > 2020-11-07T22:50:57.4315396Z (9, 'Hi2', 'hi'), > 2020-11-07T22:50:57.4315773Z (13, 'Hi3', 'hi')], ['a', 'b', 'c']) > 2020-11-07T22:50:57.4316023Z > self.t_env.create_temporary_view("source", t) > 2020-11-07T22:50:57.4316299Z table_with_retract_message = > self.t_env.sql_query( > 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, > LAST_VALUE(c) as c from source group by a") > 2020-11-07T22:50:57.4316919Z result = > table_with_retract_message.group_by(t.c) \ > 2020-11-07T22:50:57.4317197Z > .select(test_iterate(t.b).alias("a"), t.c) \ > 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"), > 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"), > 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"), > 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"), > 2020-11-07T22:50:57.4318814Z t.c.alias("e")) > 2020-11-07T22:50:57.4319023Z assert_frame_equal( > 2020-11-07T22:50:57.4319208Z > result.to_pandas(), > 2020-11-07T22:50:57.4319408Z pd.DataFrame([ > 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", > 'hello:3,hello2:1', 2, "hello"], > 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", > "Hi:3,Hi2:2,Hi3:1", 3, "hi"]], > 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', > 'e'])) > 2020-11-07T22:50:57.4321198Z > 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: > 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _
[GitHub] [flink] kl0u commented on pull request #14028: [FLINK-20020][client] Make UnsuccessfulExecutionException part of the JobClient.getJobExecutionResult() contract
kl0u commented on pull request #14028: URL: https://github.com/apache/flink/pull/14028#issuecomment-725901773 Hi @SteNicholas , I reviewing your PR now. One thing to add is some tests (for each client) that verify that all clients throw the expected type of exception. This is to make sure that no one breaks the contract in the future. Also this can become part of the javadoc of the `JobClient.getJobExecutionResult()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-20101) Fix the wrong documentation of FROM_UNIXTIME function
Jark Wu created FLINK-20101: --- Summary: Fix the wrong documentation of FROM_UNIXTIME function Key: FLINK-20101 URL: https://issues.apache.org/jira/browse/FLINK-20101 Project: Flink Issue Type: Bug Components: Documentation, Table SQL / API Reporter: Jark Wu Fix For: 1.12.0, 1.11.3 Attachments: image-2020-11-12-15-43-16-755.png The return value should be '1970-01-01 00:00:44' in UTC time zone. !image-2020-11-12-15-43-16-755.png|thumbnail! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19667) Add integration with AWS Glue
[ https://issues.apache.org/jira/browse/FLINK-19667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230398#comment-17230398 ] Robert Metzger commented on FLINK-19667: Yes, the cutoff date for new features was last Sunday. The Glue integration will have to go into Flink 1.13, sorry. > Add integration with AWS Glue > - > > Key: FLINK-19667 > URL: https://issues.apache.org/jira/browse/FLINK-19667 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.0, 1.11.1, 1.11.2, 1.11.3 >Reporter: Mohit Paliwal >Priority: Major > Labels: AWS, Glue > Fix For: 1.12.0 > > > AWS Glue is releasing new features for the AWS Glue Data Catalog. This > request is to add a new format to launch an integration for Apache Flink with > AWS Glue Data Catalog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] morsapaes commented on a change in pull request #14041: [FLINK-20096][docs] Clean up PyFlink docs
morsapaes commented on a change in pull request #14041: URL: https://github.com/apache/flink/pull/14041#discussion_r521887420 ## File path: docs/dev/python/table-api-users-guide/index.md ## @@ -24,7 +24,6 @@ under the License. --> Python Table API allows users to develop [Table API]({% link dev/table/tableApi.md %}) programs using the Python language. Review comment: In general, I'm confused whether the standard is to call it Python Table API or PyFlink Table API. As is, there's a mix of both in the docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] morsapaes commented on a change in pull request #14041: [FLINK-20096][docs] Clean up PyFlink docs
morsapaes commented on a change in pull request #14041: URL: https://github.com/apache/flink/pull/14041#discussion_r521870246 ## File path: docs/dev/python/table-api-users-guide/conversion_of_pandas.md ## @@ -22,17 +22,18 @@ specific language governing permissions and limitations under the License. --> -It supports to convert between PyFlink Table and Pandas DataFrame. +PyFlink Table API supports conversion to and from Pandas DataFrame. * This will be replaced by the TOC {:toc} ## Convert Pandas DataFrame to PyFlink Table -It supports creating a PyFlink Table from a Pandas DataFrame. Internally, it will serialize the Pandas DataFrame -using Arrow columnar format at client side and the serialized data will be processed and deserialized in Arrow source -during execution. The Arrow source could also be used in streaming jobs and it will properly handle the checkpoint -and provides the exactly once guarantees. +Pandas DataFrames can be converted into a PyFlink TAble. +Internally, PyFlink will serialize the Pandas DataFrame using Arrow columnar format on the client. +The serialized data will be processed and deserialized in Arrow source during execution. +The Arrow source can also be used in streaming jobs, and is integrated with checkpointing to +and provide the exactly once guarantees. Review comment: ```suggestion provide exactly-once guarantees. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] morsapaes commented on a change in pull request #14041: [FLINK-20096][docs] Clean up PyFlink docs
morsapaes commented on a change in pull request #14041: URL: https://github.com/apache/flink/pull/14041#discussion_r521871549 ## File path: docs/dev/python/table-api-users-guide/conversion_of_pandas.md ## @@ -60,11 +61,11 @@ table = t_env.from_pandas(pdf, ## Convert PyFlink Table to Pandas DataFrame -It also supports converting a PyFlink Table to a Pandas DataFrame. Internally, it will materialize the results of the -table and serialize them into multiple Arrow batches of Arrow columnar format at client side. The maximum Arrow batch size -is determined by the config option [python.fn-execution.arrow.batch.size]({% link dev/python/table-api-users-guide/python_config.md %}#python-fn-execution-arrow-batch-size). -The serialized data will then be converted to Pandas DataFrame. It will collect the content of the table to -the client side and so please make sure that the content of the table could fit in memory before calling this method. +PyFlink Tables can additionally be converted into a Pandas DataFrame. +The resulting rows will materialized into multiple Arrow batches of Arrow columnar format on the client. +The maximum Arrow batch size is configured via the option [python.fn-execution.arrow.batch.size]({% link dev/python/table-api-users-guide/python_config.md %}#python-fn-execution-arrow-batch-size). +The serialized data will then be converted to a Pandas DataFrame. +Because the contents of the table will be collected on the client, please ensure that the results of the table can fit in memory before calling this method. Review comment: ```suggestion PyFlink Tables can additionally be converted into a Pandas DataFrame. The resulting rows will be serialized as multiple Arrow batches of Arrow columnar format on the client. The maximum Arrow batch size is configured via the option [python.fn-execution.arrow.batch.size]({% link dev/python/table-api-users-guide/python_config.md %}#python-fn-execution-arrow-batch-size). The serialized data will then be converted to a Pandas DataFrame. Because the contents of the table will be collected on the client, please ensure that the results can fit in memory before calling this method. ``` ## File path: docs/dev/python/table-api-users-guide/conversion_of_pandas.md ## @@ -22,17 +22,18 @@ specific language governing permissions and limitations under the License. --> -It supports to convert between PyFlink Table and Pandas DataFrame. +PyFlink Table API supports conversion to and from Pandas DataFrame. * This will be replaced by the TOC {:toc} ## Convert Pandas DataFrame to PyFlink Table -It supports creating a PyFlink Table from a Pandas DataFrame. Internally, it will serialize the Pandas DataFrame -using Arrow columnar format at client side and the serialized data will be processed and deserialized in Arrow source -during execution. The Arrow source could also be used in streaming jobs and it will properly handle the checkpoint -and provides the exactly once guarantees. +Pandas DataFrames can be converted into a PyFlink TAble. +Internally, PyFlink will serialize the Pandas DataFrame using Arrow columnar format on the client. +The serialized data will be processed and deserialized in Arrow source during execution. +The Arrow source can also be used in streaming jobs, and is integrated with checkpointing to +and provide the exactly once guarantees. Review comment: ```suggestion provide the exactly once guarantees. ``` ## File path: docs/dev/python/table-api-users-guide/conversion_of_pandas.md ## @@ -22,17 +22,18 @@ specific language governing permissions and limitations under the License. --> -It supports to convert between PyFlink Table and Pandas DataFrame. +PyFlink Table API supports conversion to and from Pandas DataFrame. * This will be replaced by the TOC {:toc} ## Convert Pandas DataFrame to PyFlink Table -It supports creating a PyFlink Table from a Pandas DataFrame. Internally, it will serialize the Pandas DataFrame -using Arrow columnar format at client side and the serialized data will be processed and deserialized in Arrow source -during execution. The Arrow source could also be used in streaming jobs and it will properly handle the checkpoint -and provides the exactly once guarantees. +Pandas DataFrames can be converted into a PyFlink TAble. Review comment: ```suggestion Pandas DataFrames can be converted into a PyFlink Table. ``` ## File path: docs/dev/python/table-api-users-guide/index.md ## @@ -24,7 +24,6 @@ under the License. --> Python Table API allows users to develop [Table API]({% link dev/table/tableApi.md %}) programs using the Python language. Review comment: ```suggestion The Python Table API allows users to develop [Table API]({% link dev/table/tableApi.md %}) programs using the Python language. ``` ## File path:
[GitHub] [flink] flinkbot edited a comment on pull request #13834: [FLINK-19872][csv] Fix CSV format is unable to parse millisecond for TIME type
flinkbot edited a comment on pull request #13834: URL: https://github.com/apache/flink/pull/13834#issuecomment-718406146 ## CI report: * 6dd7a167a257fe6379340600a53ab4262c35f62f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9486) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs
[ https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230377#comment-17230377 ] Xuannan Su commented on FLINK-19253: [~becket_qin] I think https://issues.apache.org/jira/browse/FLINK-19448 has fixed the problem. Can you take a look? > SourceReaderTestBase.testAddSplitToExistingFetcher hangs > > > Key: FLINK-19253 > URL: https://issues.apache.org/jira/browse/FLINK-19253 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 > tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f] > 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5237962Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5238886Z - waiting on <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5239380Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5240401Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5241471Z - locked <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5242180Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5243245Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5244263Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5245128Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5245973Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5247081Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5247816Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5248809Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5249463Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5249827Z > 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 > tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000] > 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5251636Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5252767Z - waiting on <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5253336Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5254184Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5255220Z - locked <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5255678Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5256235Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5256803Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5257351Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5257838Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5258284Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5258856Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5259350Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5260011Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5260211Z > 2020-09-15T10:51:35.5260574Z "process reaper" #24 daemon prio=10 os_prio=0 > tid=0x7f6f70042000 nid=0x844 waiting on condition [0x7f6fd832a000] > 2020-09-15T10:51:35.5261036Zjava.lang.Thread.State: TIMED_WAITING > (parking) > 2020-09-15T10:51:35.5261342Z at sun.misc.Unsafe.park(Native Method) > 2020-09-15T10:51:35.5261972Z - parking to wait for <0x815d0810> (a >
[jira] [Updated] (FLINK-20100) Lag aggregate function does not return lag, but current row
[ https://issues.apache.org/jira/browse/FLINK-20100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-20100: Component/s: Table SQL / Planner > Lag aggregate function does not return lag, but current row > --- > > Key: FLINK-20100 > URL: https://issues.apache.org/jira/browse/FLINK-20100 > Project: Flink > Issue Type: Bug > Components: API / Python, Table SQL / Planner >Affects Versions: 1.11.2 >Reporter: Thilo Schneider >Priority: Major > > The lag aggregate function seems to always return the current row and not the > row one lagged behind: > {code:java} > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > t_env = StreamTableEnvironment.create( environment_settings=env_settings) > t_env.execute_sql(""" > CREATE TABLE datagen ( > foo INT, > message_time AS to_timestamp(from_unixtime(foo)), > WATERMARK FOR message_time AS message_time > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='3', > 'fields.foo.kind'='sequence', > 'fields.foo.start'='1', > 'fields.foo.end'='10' > )""") > t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen > WINDOW w AS (ORDER BY message_time)") > t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH > ('connector' = 'print')") > t.execute_insert("output") > {code} > This results in > {code:java} > +I(1,1) // Expected (1, null) > +I(2,2) // Expected (2, 1) > +I(3,3) // Expected (3, 2) > +I(4,4) // and so on > +I(5,5) > +I(6,6) > +I(7,7) > +I(8,8) > +I(9,9) > +I(10,10) > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13834: [FLINK-19872][csv] Fix CSV format is unable to parse millisecond for TIME type
flinkbot edited a comment on pull request #13834: URL: https://github.com/apache/flink/pull/13834#issuecomment-718406146 ## CI report: * 63f6644db59a1a512ceef894317b3bf29355b7a0 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9485) * 6dd7a167a257fe6379340600a53ab4262c35f62f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9486) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20100) Lag aggregate function does not return lag, but current row
[ https://issues.apache.org/jira/browse/FLINK-20100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thilo Schneider updated FLINK-20100: Description: The lag aggregate function seems to always return the current row and not the row one lagged behind: {code:java} from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create( environment_settings=env_settings) t_env.execute_sql(""" CREATE TABLE datagen ( foo INT, message_time AS to_timestamp(from_unixtime(foo)), WATERMARK FOR message_time AS message_time ) WITH ( 'connector' = 'datagen', 'rows-per-second'='3', 'fields.foo.kind'='sequence', 'fields.foo.start'='1', 'fields.foo.end'='10' )""") t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)") t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')") t.execute_insert("output") {code} This results in {code:java} +I(1,1) // Expected (1, null) +I(2,2) // Expected (2, 1) +I(3,3) // Expected (3, 2) +I(4,4) // and so on +I(5,5) +I(6,6) +I(7,7) +I(8,8) +I(9,9) +I(10,10) {code} was: The lag aggregate function seems to always return the current row and not the row one lagged behind: {code:java} from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create( environment_settings=env_settings) t_env.execute_sql(""" CREATE TABLE datagen ( foo INT, message_time AS to_timestamp(from_unixtime(foo)), WATERMARK FOR message_time AS message_time ) WITH ( 'connector' = 'datagen', 'rows-per-second'='3', 'fields.foo.kind'='sequence', 'fields.foo.start'='1', 'fields.foo.end'='10' )""") t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)") t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')") t.execute_insert("output") {code} This results in {code:java} +I(1,1) // Expected (1, null) +I(2,2) // Expected (2, 1) +I(3,3) // Expected (3, 2) +I(4,4) // and so on +I(5,5) +I(6,6) +I(7,7) +I(8,8) +I(9,9) +I(10,10) {code} > Lag aggregate function does not return lag, but current row > --- > > Key: FLINK-20100 > URL: https://issues.apache.org/jira/browse/FLINK-20100 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.11.2 >Reporter: Thilo Schneider >Priority: Major > > The lag aggregate function seems to always return the current row and not the > row one lagged behind: > {code:java} > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > t_env = StreamTableEnvironment.create( environment_settings=env_settings) > t_env.execute_sql(""" > CREATE TABLE datagen ( > foo INT, > message_time AS to_timestamp(from_unixtime(foo)), > WATERMARK FOR message_time AS message_time > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='3', > 'fields.foo.kind'='sequence', > 'fields.foo.start'='1', > 'fields.foo.end'='10' > )""") > t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen > WINDOW w AS (ORDER BY message_time)") > t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH > ('connector' = 'print')") > t.execute_insert("output") > {code} > This results in > {code:java} > +I(1,1) // Expected (1, null) > +I(2,2) // Expected (2, 1) > +I(3,3) // Expected (3, 2) > +I(4,4) // and so on > +I(5,5) > +I(6,6) > +I(7,7) > +I(8,8) > +I(9,9) > +I(10,10) > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20100) Lag aggregate function does not return lag, but current row
[ https://issues.apache.org/jira/browse/FLINK-20100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thilo Schneider updated FLINK-20100: Description: The lag aggregate function seems to always return the current row and not the row one lagged behind: {code:java} from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create( environment_settings=env_settings) t_env.execute_sql(""" CREATE TABLE datagen ( foo INT, message_time AS to_timestamp(from_unixtime(foo)), WATERMARK FOR message_time AS message_time ) WITH ( 'connector' = 'datagen', 'rows-per-second'='3', 'fields.foo.kind'='sequence', 'fields.foo.start'='1', 'fields.foo.end'='10' )""") t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)") t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')") t.execute_insert("output") {code} This results in {code:java} +I(1,1) // Expected (1, null) +I(2,2) // Expected (2, 1) +I(3,3) // Expected (3, 2) +I(4,4) // and so on +I(5,5) +I(6,6) +I(7,7) +I(8,8) +I(9,9) +I(10,10) {code} was: The lag aggregate function seems to always return the current row and not the row one lagged behind: {code:java} from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create( environment_settings=env_settings) t_env.execute_sql(""" CREATE TABLE datagen ( foo INT, message_time AS to_timestamp(from_unixtime(foo)), WATERMARK FOR message_time AS message_time ) WITH ( 'connector' = 'datagen', 'rows-per-second'='3', 'fields.foo.kind'='sequence', 'fields.foo.start'='1', 'fields.foo.end'='10' )""") t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)") t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')") t.execute_insert("output") {code} This results in {code:java} +I(1,1) // Expected (1, null) +I(2,2) // Expected (2, 1) +I(3,3) // Expected (3, 2) +I(4,4) // and so on +I(5,5) +I(6,6) +I(7,7) +I(8,8) +I(9,9) +I(10,10) {code} > Lag aggregate function does not return lag, but current row > --- > > Key: FLINK-20100 > URL: https://issues.apache.org/jira/browse/FLINK-20100 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.11.2 >Reporter: Thilo Schneider >Priority: Major > > The lag aggregate function seems to always return the current row and not the > row one lagged behind: > {code:java} > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > t_env = StreamTableEnvironment.create( environment_settings=env_settings) > t_env.execute_sql(""" > CREATE TABLE datagen ( > foo INT, > message_time AS to_timestamp(from_unixtime(foo)), > WATERMARK FOR message_time AS message_time > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='3', > 'fields.foo.kind'='sequence', > 'fields.foo.start'='1', > 'fields.foo.end'='10' > )""") > t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen > WINDOW w AS (ORDER BY message_time)") > t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH > ('connector' = 'print')") > t.execute_insert("output") > {code} > > This results in > > {code:java} > +I(1,1) // Expected (1, null) > +I(2,2) // Expected (2, 1) > +I(3,3) // Expected (3, 2) > +I(4,4) // and so on > +I(5,5) > +I(6,6) > +I(7,7) > +I(8,8) > +I(9,9) > +I(10,10) > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20100) Lag aggregate function does not return lag, but current row
Thilo Schneider created FLINK-20100: --- Summary: Lag aggregate function does not return lag, but current row Key: FLINK-20100 URL: https://issues.apache.org/jira/browse/FLINK-20100 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.11.2 Reporter: Thilo Schneider The lag aggregate function seems to always return the current row and not the row one lagged behind: {code:java} from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create( environment_settings=env_settings) t_env.execute_sql(""" CREATE TABLE datagen ( foo INT, message_time AS to_timestamp(from_unixtime(foo)), WATERMARK FOR message_time AS message_time ) WITH ( 'connector' = 'datagen', 'rows-per-second'='3', 'fields.foo.kind'='sequence', 'fields.foo.start'='1', 'fields.foo.end'='10' )""") t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)") t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')") t.execute_insert("output") {code} This results in {code:java} +I(1,1) // Expected (1, null) +I(2,2) // Expected (2, 1) +I(3,3) // Expected (3, 2) +I(4,4) // and so on +I(5,5) +I(6,6) +I(7,7) +I(8,8) +I(9,9) +I(10,10) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14006: [FLINK-19546][doc] Add documentation for native Kubernetes HA
flinkbot edited a comment on pull request #14006: URL: https://github.com/apache/flink/pull/14006#issuecomment-724437611 ## CI report: * 19fdb8c1ce84f1a0c8b432331e8da5678fd98383 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9487) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-15906) physical memory exceeded causing being killed by yarn
[ https://issues.apache.org/jira/browse/FLINK-15906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230365#comment-17230365 ] Xintong Song commented on FLINK-15906: -- Hi [~清月], If the problem does not happen frequently, I would suggest to first try configure a larger JVM overhead memory size. The configuration options are `taskmanager.memory.jvm-overhead.[min|max|fraction]`. https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup.html#capped-fractionated-components > physical memory exceeded causing being killed by yarn > - > > Key: FLINK-15906 > URL: https://issues.apache.org/jira/browse/FLINK-15906 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: liupengcheng >Priority: Major > > Recently, we encoutered this issue when testing TPCDS query with 100g data. > I first meet this issue when I only set the > `taskmanager.memory.total-process.size` to `4g` with `-tm` option. Then I try > to increase the jvmOverhead size with following arguments, but still failed. > {code:java} > taskmanager.memory.jvm-overhead.min: 640m > taskmanager.memory.jvm-metaspace: 128m > taskmanager.memory.task.heap.size: 1408m > taskmanager.memory.framework.heap.size: 128m > taskmanager.memory.framework.off-heap.size: 128m > taskmanager.memory.managed.size: 1408m > taskmanager.memory.shuffle.max: 256m > {code} > {code:java} > java.lang.Exception: [2020-02-05 11:31:32.345]Container > [pid=101677,containerID=container_e08_1578903621081_4785_01_51] is > running 46342144B beyond the 'PHYSICAL' memory limit. Current usage: 4.04 GB > of 4 GB physical memory used; 17.68 GB of 40 GB virtual memory used. Killing > container.java.lang.Exception: [2020-02-05 11:31:32.345]Container > [pid=101677,containerID=container_e08_1578903621081_4785_01_51] is > running 46342144B beyond the 'PHYSICAL' memory limit. Current usage: 4.04 GB > of 4 GB physical memory used; 17.68 GB of 40 GB virtual memory used. Killing > container.Dump of the process-tree for > container_e08_1578903621081_4785_01_51 : |- PID PPID PGRPID SESSID > CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) > RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 101938 101677 101677 101677 (java) 25762 > 3571 18867417088 1059157 /opt/soft/openjdk1.8.0/bin/java > -Dhadoop.root.logfile=syslog -Xmx1610612736 -Xms1610612736 > -XX:MaxDirectMemorySize=402653184 -XX:MaxMetaspaceSize=134217728 > -Dlog.file=/home/work/hdd5/yarn/zjyprc-analysis/nodemanager/application_1578903621081_4785/container_e08_1578903621081_4785_01_51/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner -D > taskmanager.memory.shuffle.max=268435456b -D > taskmanager.memory.framework.off-heap.size=134217728b -D > taskmanager.memory.framework.heap.size=134217728b -D > taskmanager.memory.managed.size=1476395008b -D taskmanager.cpu.cores=1.0 -D > taskmanager.memory.task.heap.size=1476395008b -D > taskmanager.memory.task.off-heap.size=0b -D > taskmanager.memory.shuffle.min=268435456b --configDir . > -Djobmanager.rpc.address=zjy-hadoop-prc-st2805.bj -Dweb.port=0 > -Dweb.tmpdir=/tmp/flink-web-4bf6cd3a-a6e1-4b46-b140-b8ac7bdffbeb > -Djobmanager.rpc.port=36769 -Dtaskmanager.memory.managed.size=1476395008b > -Drest.address=zjy-hadoop-prc-st2805.bj |- 101677 101671 101677 101677 (bash) > 1 1 118030336 733 /bin/bash -c /opt/soft/openjdk1.8.0/bin/java > -Dhadoop.root.logfile=syslog -Xmx1610612736 -Xms1610612736 > -XX:MaxDirectMemorySize=402653184 -XX:MaxMetaspaceSize=134217728 > -Dlog.file=/home/work/hdd5/yarn/zjyprc-analysis/nodemanager/application_1578903621081_4785/container_e08_1578903621081_4785_01_51/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner -D > taskmanager.memory.shuffle.max=268435456b -D > taskmanager.memory.framework.off-heap.size=134217728b -D > taskmanager.memory.framework.heap.size=134217728b -D > taskmanager.memory.managed.size=1476395008b -D taskmanager.cpu.cores=1.0 -D > taskmanager.memory.task.heap.size=1476395008b -D > taskmanager.memory.task.off-heap.size=0b -D > taskmanager.memory.shuffle.min=268435456b --configDir . > -Djobmanager.rpc.address=zjy-hadoop-prc-st2805.bj -Dweb.port=0 > -Dweb.tmpdir=/tmp/flink-web-4bf6cd3a-a6e1-4b46-b140-b8ac7bdffbeb > -Djobmanager.rpc.port=36769 -Dtaskmanager.memory.managed.size=1476395008b > -Drest.address=zjy-hadoop-prc-st2805.bj 1> > /home/work/hdd5/yarn/zjyprc-analysis/nodemanager/application_1578903621081_4785/container_e08_1578903621081_4785_01_51/taskmanager.out > 2> > /home/work/hdd5/yarn/zjyprc-analysis/nodemanager/application_1578903621081_4785/container_e08_1578903621081_4785_01_51/taskmanager.err > {code} > I
[GitHub] [flink] flinkbot edited a comment on pull request #14042: [FLINK-19300][flink-core] Fix input stream read to prevent heap based timer loss
flinkbot edited a comment on pull request #14042: URL: https://github.com/apache/flink/pull/14042#issuecomment-725744439 ## CI report: * 789adc9519b48d6474d12b282d529ea0a024bfc9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9484) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14032: [FLINK-19863][tests] check embedded zookeeper status when start/close hbase resource
flinkbot edited a comment on pull request #14032: URL: https://github.com/apache/flink/pull/14032#issuecomment-725359904 ## CI report: * b201454a7b90b9f332263f5e2b7013d67d11a8fe Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9464) * e4560ca82e9afc677177af015c3d9298296d4a77 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9488) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14006: [FLINK-19546][doc] Add documentation for native Kubernetes HA
flinkbot edited a comment on pull request #14006: URL: https://github.com/apache/flink/pull/14006#issuecomment-724437611 ## CI report: * 1793e46f94174a314f182dbf7845cd229d014d70 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9462) * 19fdb8c1ce84f1a0c8b432331e8da5678fd98383 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9487) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14032: [FLINK-19863][tests] check embedded zookeeper status when start/close hbase resource
flinkbot edited a comment on pull request #14032: URL: https://github.com/apache/flink/pull/14032#issuecomment-725359904 ## CI report: * b201454a7b90b9f332263f5e2b7013d67d11a8fe Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9464) * e4560ca82e9afc677177af015c3d9298296d4a77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14006: [FLINK-19546][doc] Add documentation for native Kubernetes HA
flinkbot edited a comment on pull request #14006: URL: https://github.com/apache/flink/pull/14006#issuecomment-724437611 ## CI report: * 1793e46f94174a314f182dbf7845cd229d014d70 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9462) * 19fdb8c1ce84f1a0c8b432331e8da5678fd98383 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a change in pull request #14027: [FLINK-20074][table-planner-blink] Fix can't generate plan when joining on changelog source without updates
leonardBang commented on a change in pull request #14027: URL: https://github.com/apache/flink/pull/14027#discussion_r521808438 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala ## @@ -57,6 +57,22 @@ class StreamExecJoin( with StreamPhysicalRel with StreamExecNode[RowData] { + /** + * This is mainly used in `FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor`. + * If the unique key of input contains join key, then it can support ignoring UPDATE_BEFORE. + * Otherwise, it can't ignore UPDATE_BEFORE. For example, if the input schema is [id, name, cnt] + * with the unique key (id). The join key is (id, name), then an insert and update on the id: Review comment: ``` * with the unique key (id). The join key is (id, name), then an insert and update on the id: ``` I think current logic is unique key contains join key rather than unique key is contained in join key, may this is an existed bug `inputUniqueKeys.exists { uniqueKey => joinKeys.forall(uniqueKey.toArray.contains(_)) }` ignore this comment.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13834: [FLINK-19872][csv] Fix CSV format is unable to parse millisecond for TIME type
flinkbot edited a comment on pull request #13834: URL: https://github.com/apache/flink/pull/13834#issuecomment-718406146 ## CI report: * dc0a2a29d0fba59d7b7769353bd4d91aa1307ecb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8741) * 63f6644db59a1a512ceef894317b3bf29355b7a0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9485) * 6dd7a167a257fe6379340600a53ab4262c35f62f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9486) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a change in pull request #14027: [FLINK-20074][table-planner-blink] Fix can't generate plan when joining on changelog source without updates
leonardBang commented on a change in pull request #14027: URL: https://github.com/apache/flink/pull/14027#discussion_r521808720 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala ## @@ -57,15 +57,19 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti // step2: satisfy UpdateKind trait val rootModifyKindSet = getModifyKindSet(rootWithModifyKindSet) // use the required UpdateKindTrait from parent blocks -val requiredUpdateKindTraits = if (context.isUpdateBeforeRequired) { - Seq(UpdateKindTrait.BEFORE_AND_AFTER) -} else if (rootModifyKindSet.isInsertOnly) { - Seq(UpdateKindTrait.NONE) +val requiredUpdateKindTraits = if (rootModifyKindSet.contains(ModifyKind.UPDATE)) { + if (context.isUpdateBeforeRequired) { +Seq(UpdateKindTrait.BEFORE_AND_AFTER) + } else { +// update_before is not required, and input contains updates +// try ONLY_UPDATE_AFTER first, and then BEFORE_AND_AFTER +Seq(UpdateKindTrait.ONLY_UPDATE_AFTER, UpdateKindTrait.BEFORE_AND_AFTER) + } } else { - // update_before is not required, and input contains updates - // try ONLY_UPDATE_AFTER first, and then BEFORE_AND_AFTER - Seq(UpdateKindTrait.ONLY_UPDATE_AFTER, UpdateKindTrait.BEFORE_AND_AFTER) + // there is no updates + Seq(UpdateKindTrait.NONE) } + Review comment: delete this line ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala ## @@ -57,6 +57,22 @@ class StreamExecJoin( with StreamPhysicalRel with StreamExecNode[RowData] { + /** + * This is mainly used in `FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor`. + * If the unique key of input contains join key, then it can support ignoring UPDATE_BEFORE. + * Otherwise, it can't ignore UPDATE_BEFORE. For example, if the input schema is [id, name, cnt] + * with the unique key (id). The join key is (id, name), then an insert and update on the id: Review comment: ``` * with the unique key (id). The join key is (id, name), then an insert and update on the id: ``` I think current logic is unique key contains join key rather than unique key is contained in join key, may this is an existed bug `inputUniqueKeys.exists { uniqueKey => joinKeys.forall(uniqueKey.toArray.contains(_)) }` ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala ## @@ -281,25 +281,40 @@ class TableScanTest extends TableTestBase { @Test def testJoinOnChangelogSource(): Unit = { +verifyJoinOnSource("I,UB,UA") + } + + @Test + def testJoinOnNoUpdateSource(): Unit = { +verifyJoinOnSource("I,D") + } + + @Test + def testJoinOnUpsertSource(): Unit = { +verifyJoinOnSource("UA,D") + } + + private def verifyJoinOnSource(changelogMode: String): Unit = { util.addTable( """ - |CREATE TABLE orders ( - | amount BIGINT, - | currency STRING - |) WITH ( - | 'connector' = 'values', - | 'changelog-mode' = 'I' - |) - |""".stripMargin) +|CREATE TABLE orders ( +| amount BIGINT, +| currency STRING +|) WITH ( +| 'connector' = 'values', +| 'changelog-mode' = 'I' +|) +|""".stripMargin) util.addTable( - """ - |CREATE TABLE rates_history ( - | currency STRING, - | rate BIGINT - |) WITH ( - | 'connector' = 'values', - | 'changelog-mode' = 'I,UB,UA' - |) + s""" +|CREATE TABLE rates_history ( +| currency STRING, +| rate BIGINT, +| PRIMARY KEY (currency) NOT ENFORCED Review comment: could you change the join key to `(currency, currency_no)` to check the join key contains unique key case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20082) Cannot start Flink application due to "cannot assign instance of java.lang.invoke.SerializedLambda to type scala.Function1
[ https://issues.apache.org/jira/browse/FLINK-20082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230341#comment-17230341 ] ChangZhuo Chen (陳昌倬) commented on FLINK-20082: -- There are some finding for this issues: * `-Dsun.io.serialization.extendedDebugInfo=true` will trigger this problem. ** The previous good jar will have this issue when we set `-Dsun.io.serialization.extendedDebugInfo=true` * Without `-Dsun.io.serialization.extendedDebugInfo=true`, using ValueStateDescriptor with default value will cause this issue. {code:java} val descriptor = new ValueStateDescriptor[Long]("last_event_time", classOf[Long], 0L) // Remove 0L arg, and the problem will be gone. ... ... val ttlConfig = StateTtlConfig .newBuilder(Time.days(14)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupFullSnapshot() .build() descriptor.enableTimeToLive(ttlConfig) {code} > Cannot start Flink application due to "cannot assign instance of > java.lang.invoke.SerializedLambda to type scala.Function1 > -- > > Key: FLINK-20082 > URL: https://issues.apache.org/jira/browse/FLINK-20082 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala >Affects Versions: 1.11.2 > Environment: * Flink 1.11.2 > * Java 11 > * Scala 2.12.11 >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > Attachments: Flink-1.11.2_Scala-2.11.log, > Flink-1.11.2_Scala-2.12.log, image-20201110-060934.png > > > Hi, > * Our Flink application (Java 11 + Scala 2.12) has the following problem > when executing it. It cannot be run in Flink cluster. > * The problem is similar to > https://issues.apache.org/jira/browse/SPARK-25047, so maybe the same fix > shall be implemented in Flink? > !image-20201110-060934.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14040: [FLINK-20097][checkpointing] Race conditions in InputChannel.ChannelStatePersister
flinkbot edited a comment on pull request #14040: URL: https://github.com/apache/flink/pull/14040#issuecomment-725585398 ## CI report: * 5f1010916f552c9a4ae623a7154ccaaa2b1fdb76 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9483) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13834: [FLINK-19872][csv] Fix CSV format is unable to parse millisecond for TIME type
flinkbot edited a comment on pull request #13834: URL: https://github.com/apache/flink/pull/13834#issuecomment-718406146 ## CI report: * dc0a2a29d0fba59d7b7769353bd4d91aa1307ecb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8741) * 63f6644db59a1a512ceef894317b3bf29355b7a0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9485) * 6dd7a167a257fe6379340600a53ab4262c35f62f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19872) Fix CSV format is unable to parse millisecond for TIME type
[ https://issues.apache.org/jira/browse/FLINK-19872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19872: Fix Version/s: 1.12.0 > Fix CSV format is unable to parse millisecond for TIME type > --- > > Key: FLINK-19872 > URL: https://issues.apache.org/jira/browse/FLINK-19872 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Reporter: Fangliang Liu >Assignee: Fangliang Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19872) Fix CSV format is unable to parse millisecond for TIME type
[ https://issues.apache.org/jira/browse/FLINK-19872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19872: Summary: Fix CSV format is unable to parse millisecond for TIME type (was: Support to parse millisecond for TIME type in CSV format) > Fix CSV format is unable to parse millisecond for TIME type > --- > > Key: FLINK-19872 > URL: https://issues.apache.org/jira/browse/FLINK-19872 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Reporter: Fangliang Liu >Assignee: Fangliang Liu >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangyang0918 commented on a change in pull request #14006: [FLINK-19546][doc] Add documentation for native Kubernetes HA
wangyang0918 commented on a change in pull request #14006: URL: https://github.com/apache/flink/pull/14006#discussion_r521791380 ## File path: docs/ops/jobmanager_high_availability.md ## @@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost. $ bin/yarn-session.sh -n 2 +## Kubernetes Cluster High Availability +We are using a [deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/) for the JobManager when deploying Flink on Kubernetes cluster. The `replicas` is configured to 1, which means that a new JobManager will be launched to take over the leadership once the current one terminated exceptionally. Review comment: Hmm, I think "running Flink as a Kubernetes deployment" is not accurate. Because we are running Flink JobManager as a Kubernetes deployment. Do you think the following sentences make sense? When running Flink JobManager as a Kubernetes [deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/), the `replica` count should be configured to 1 or greater. * The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally. * The value `N`(greater than 1) means that multiple JobManagers will be launched while one is active and others are standby. Starting more than one JobManager will make the recovery faster. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #14035: [FLINK-19340][table-runtime-blink] Support NestedRowData in RowDataSerializer#copy
wuchong commented on a change in pull request #14035: URL: https://github.com/apache/flink/pull/14035#discussion_r521786370 ## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java ## @@ -205,44 +243,77 @@ private static GenericRowData createRow(Object f0, Object f1, Object f2, Object } private static boolean deepEqualsRowData( - RowData should, RowData is, RowDataSerializer serializer1, RowDataSerializer serializer2) { + RowData should, + RowData is, + RowDataSerializer serializer1, + RowDataSerializer serializer2) { + return deepEqualsRowData(should, is, serializer1, serializer2, false); + } + + private static boolean deepEqualsRowData( + RowData should, + RowData is, + RowDataSerializer serializer1, + RowDataSerializer serializer2, + boolean checkClass) { if (should.getArity() != is.getArity()) { return false; } + if (checkClass && should.getClass() != is.getClass()) { + return false; + } + BinaryRowData row1 = serializer1.toBinaryRow(should); Review comment: It always convert rows to `BinaryRowData` for comparing. Could we additionally compare the `RowData` directly when `checkClass` is true? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13834: [FLINK-19872][Formats/Csv] Support to parse millisecond for TIME type in CSV format
flinkbot edited a comment on pull request #13834: URL: https://github.com/apache/flink/pull/13834#issuecomment-718406146 ## CI report: * dc0a2a29d0fba59d7b7769353bd4d91aa1307ecb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8741) * 63f6644db59a1a512ceef894317b3bf29355b7a0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9485) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19667) Add integration with AWS Glue
[ https://issues.apache.org/jira/browse/FLINK-19667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230327#comment-17230327 ] Mohit Paliwal commented on FLINK-19667: --- Hi Robert - One more question, do you have a cutoff date for changes to be included to v 1.12 > Add integration with AWS Glue > - > > Key: FLINK-19667 > URL: https://issues.apache.org/jira/browse/FLINK-19667 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.0, 1.11.1, 1.11.2, 1.11.3 >Reporter: Mohit Paliwal >Priority: Major > Labels: AWS, Glue > Fix For: 1.12.0 > > > AWS Glue is releasing new features for the AWS Glue Data Catalog. This > request is to add a new format to launch an integration for Apache Flink with > AWS Glue Data Catalog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18932) Add a "Overview" document under the "Python API" section
[ https://issues.apache.org/jira/browse/FLINK-18932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230326#comment-17230326 ] Wei Zhong commented on FLINK-18932: --- [~sjwiesman] Of course. Thank you! > Add a "Overview" document under the "Python API" section > - > > Key: FLINK-18932 > URL: https://issues.apache.org/jira/browse/FLINK-18932 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Wei Zhong >Assignee: Wei Zhong >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu commented on pull request #13920: [FLINK-19743] Add metric definitions in FLIP-33 and report some of them by default.
dianfu commented on pull request #13920: URL: https://github.com/apache/flink/pull/13920#issuecomment-725787288 @becketqin Fair enough for me. +1 from my side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19863) SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout"
[ https://issues.apache.org/jira/browse/FLINK-19863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230324#comment-17230324 ] Dian Fu commented on FLINK-19863: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9481=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6 > SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process > failed due to timeout" > --- > > Key: FLINK-19863 > URL: https://issues.apache.org/jira/browse/FLINK-19863 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Leonard Xu >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6 > {code} > 00:50:02,589 [main] INFO > org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping > Flink cluster. > 00:50:04,106 [main] INFO > org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping > Flink cluster. > 00:50:04,741 [main] INFO > org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource [] - Backed up > logs to > /home/vsts/work/1/s/flink-end-to-end-tests/artifacts/flink-b3924665-1ac9-4309-8255-20f0dc94e7b9. > 00:50:04,788 [main] INFO > org.apache.flink.tests.util.hbase.LocalStandaloneHBaseResource [] - Stopping > HBase Cluster > 00:50:16,243 [main] ERROR > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase [] - > > Test testHBase[0: > hbase-version:1.4.3](org.apache.flink.tests.util.hbase.SQLClientHBaseITCase) > failed with: > java.io.IOException: Process failed due to timeout. > at > org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:130) > at > org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:108) > at > org.apache.flink.tests.util.flink.FlinkDistribution.submitSQLJob(FlinkDistribution.java:221) > at > org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource$StandaloneClusterController.submitSQLJob(LocalStandaloneFlinkResource.java:196) > at > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.executeSqlStatements(SQLClientHBaseITCase.java:215) > at > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.testHBase(SQLClientHBaseITCase.java:152) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu commented on a change in pull request #14041: [FLINK-20096][docs] Clean up PyFlink docs
dianfu commented on a change in pull request #14041: URL: https://github.com/apache/flink/pull/14041#discussion_r521771064 ## File path: docs/dev/python/index.zh.md ## @@ -22,3 +23,43 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> + + + +PyFlink is a language for building unified batch and streaming workloads. +This means real-time streaming pipelines, performing exploratory data +analysis at scale, building machine learning pipelines, and creating ETLs for a data platform. +If you're already familiar with Python and libraries such as Pandas, then PyFlink makes it simple +to leverage the full capabilities of the Apache Flink ecosystem. + +The PyFlink Table API makes it simple to write powerful relational queries for building reports and +ETL pipelines. +At the same time, the PyFlink DataStream API gives developers access to low-level control over +state and time, unlocking the full power of stream processing. + + + + +### Try PyFlink + +If you’re interested in playing around with Flink, try one of our tutorials: + +* [Intro to PyFlink Table API]({% link dev/python/table_api_tutorial.md %}) +* [Intro to PyFlink DataStream API]({% link dev/python/datastream_tutorial.md %}) Review comment: ```suggestion * [Intro to PyFlink DataStream API]({% link dev/python/datastream_tutorial.zh.md %}) ``` ## File path: docs/dev/python/index.zh.md ## @@ -22,3 +23,43 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> + + + +PyFlink is a language for building unified batch and streaming workloads. +This means real-time streaming pipelines, performing exploratory data +analysis at scale, building machine learning pipelines, and creating ETLs for a data platform. +If you're already familiar with Python and libraries such as Pandas, then PyFlink makes it simple +to leverage the full capabilities of the Apache Flink ecosystem. + +The PyFlink Table API makes it simple to write powerful relational queries for building reports and +ETL pipelines. +At the same time, the PyFlink DataStream API gives developers access to low-level control over +state and time, unlocking the full power of stream processing. + + + + +### Try PyFlink + +If you’re interested in playing around with Flink, try one of our tutorials: + +* [Intro to PyFlink Table API]({% link dev/python/table_api_tutorial.md %}) +* [Intro to PyFlink DataStream API]({% link dev/python/datastream_tutorial.md %}) + + + + +### Explore PyFlink + +The reference documentation covers all the details. Some starting points: + +* [PyFlink DataStream API]({% link dev/python/table-api-users-guide/index.md %}) Review comment: ```suggestion * [PyFlink DataStream API]({% link dev/python/table-api-users-guide/index.zh.md %}) ``` ## File path: docs/dev/python/index.zh.md ## @@ -22,3 +23,43 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> + + + +PyFlink is a language for building unified batch and streaming workloads. +This means real-time streaming pipelines, performing exploratory data +analysis at scale, building machine learning pipelines, and creating ETLs for a data platform. +If you're already familiar with Python and libraries such as Pandas, then PyFlink makes it simple +to leverage the full capabilities of the Apache Flink ecosystem. + +The PyFlink Table API makes it simple to write powerful relational queries for building reports and +ETL pipelines. +At the same time, the PyFlink DataStream API gives developers access to low-level control over +state and time, unlocking the full power of stream processing. + + + + +### Try PyFlink + +If you’re interested in playing around with Flink, try one of our tutorials: + +* [Intro to PyFlink Table API]({% link dev/python/table_api_tutorial.md %}) +* [Intro to PyFlink DataStream API]({% link dev/python/datastream_tutorial.md %}) + + + + +### Explore PyFlink + +The reference documentation covers all the details. Some starting points: + +* [PyFlink DataStream API]({% link dev/python/table-api-users-guide/index.md %}) +* [PyFlink Table API SQL]({% link dev/python/datastream-api-users-guide/index.md %}) Review comment: ```suggestion * [PyFlink Table API SQL]({% link dev/python/datastream-api-users-guide/index.zh.md %}) ``` ## File path: docs/dev/python/index.zh.md ## @@ -22,3 +23,43 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> + + + +PyFlink is a language for building unified batch and streaming workloads. +This means real-time streaming pipelines, performing exploratory data +analysis at scale, building machine learning pipelines, and
[GitHub] [flink] flinkbot edited a comment on pull request #13834: [FLINK-19872][Formats/Csv] Support to parse millisecond for TIME type in CSV format
flinkbot edited a comment on pull request #13834: URL: https://github.com/apache/flink/pull/13834#issuecomment-718406146 ## CI report: * dc0a2a29d0fba59d7b7769353bd4d91aa1307ecb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8741) * 63f6644db59a1a512ceef894317b3bf29355b7a0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pyscala commented on a change in pull request #13834: [FLINK-19872][Formats/Csv] Support to parse millisecond for TIME type in CSV format
pyscala commented on a change in pull request #13834: URL: https://github.com/apache/flink/pull/13834#discussion_r521769804 ## File path: flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java ## @@ -150,6 +151,24 @@ public void testSerializeDeserializeCustomizedProperties() throws Exception { deserConfig, ";"); testField(STRING(), "null", "null", serConfig, deserConfig, ";"); // string because null literal has not been set + testField(TIME(3), "12:12:12.232", LocalTime.parse("12:12:12.232") , deserConfig , ";"); + testField(TIME(3), "12:12:12.232342", LocalTime.parse("12:12:12.232") , deserConfig , ";"); + testField(TIME(3), "12:12:12.23", LocalTime.parse("12:12:12.23") , deserConfig , ";"); + testField(TIME(2), "12:12:12.23", LocalTime.parse("12:12:12.23") , deserConfig , ";"); + testField(TIME(2), "12:12:12.232312", LocalTime.parse("12:12:12.23") , deserConfig , ";"); + testField(TIME(2), "12:12:12.2", LocalTime.parse("12:12:12.2") , deserConfig , ";"); + testField(TIME(1), "12:12:12.2", LocalTime.parse("12:12:12.2") , deserConfig , ";"); + testField(TIME(1), "12:12:12.2235", LocalTime.parse("12:12:12.2") , deserConfig , ";"); + testField(TIME(1), "12:12:12", LocalTime.parse("12:12:12") , deserConfig , ";"); + testField(TIME(0), "12:12:12", LocalTime.parse("12:12:12") , deserConfig , ";"); + testField(TIME(0), "12:12:12.45", LocalTime.parse("12:12:12") , deserConfig , ";"); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xccui commented on pull request #13974: [hotfix][doc] Fix a concurrent issue in testing.md
xccui commented on pull request #13974: URL: https://github.com/apache/flink/pull/13974#issuecomment-725765326 I've done that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19864) TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but was:<-9223372036854775808>"
[ https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230315#comment-17230315 ] Kezhu Wang commented on FLINK-19864: I think this is probably caused by misuse of {{Thread.getState}} as synchronization tool in {{StreamTaskTestHarness.waitForInputProcessing}}. {code:java} public void waitForInputProcessing() throws Exception { while (true) { checkForErrorInTaskThread() if (allInputConsumed()) { break } } // then wait for the Task Thread to be in a blocked state // Check whether the state is blocked, this should be the case if it cannot // notifyNonEmpty more input, i.e. all currently available input has been processed. while (true) { Thread.State state = taskThread.getState(); if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED || state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) { break; } try { Thread.sleep(1); } catch (InterruptedException ignored) {} } } {code} Herre is what javadoc says about {{Thread.getState}}: {quote} Returns the state of this thread. This method is designed for use in monitoring of the system state, not for synchronization control. {quote} Even though {{Thread.threadStatus}} is volatile in JDK, it is not in JVM side. {code:c++} // Write the thread status value to threadStatus field in java.lang.Thread java class. void java_lang_Thread::set_thread_status(oop java_thread, java_lang_Thread::ThreadStatus status) { // The threadStatus is only present starting in 1.5 if (_thread_status_offset > 0) { java_thread->int_field_put(_thread_status_offset, status); } } {code} I can't give an reliable example to prove JVM code without help of additional synchronization tool, it is a is a chicken-and-egg problem in my know knowledge. This is also not the case we encounter here, as we have explicit synchronization tool in this test case: {{ConcurrentLinkedList.size}} and {{ConcurrentLinkedList.poll}}. Also I didn't find explicit blocking statement after {{ConcurrentLinkedList.poll}} and before {{inputWatermarkGauge.setCurrentWatermark}}. But *there are implicit blocking entry points: concurrent class loading.* I writes following code to verify this: {code:java} import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; public class Main { private static final List unloadedClassNames = Arrays.asList( "java.sql.DriverManager", "java.io.Console", "java.io.FileInputStream", "java.io.FilePermission" ); public static void main(String[] args) throws Exception { final CountDownLatch readyLatch = new CountDownLatch(1); final CountDownLatch classLoadingLatch = new CountDownLatch(1); final CountDownLatch doneLatch = new CountDownLatch(1); Thread pollingThread = new Thread(() -> { try { readyLatch.countDown(); while (classLoadingLatch.getCount() != 0) { Thread.yield(); } unloadedClassNames.forEach(className -> { try { Class.forName(className); Thread.yield(); } catch (Exception ex) { ex.printStackTrace(); System.exit(2); } }); while (doneLatch.getCount() != 0) { Thread.yield(); } } catch (Exception ex) { ex.printStackTrace(); System.exit(2); } }); pollingThread.start(); readyLatch.await(); classLoadingLatch.countDown(); unloadedClassNames.forEach(className -> { try { Class.forName(className); } catch (Exception ex) { ex.printStackTrace(); System.exit(2); } }); Thread.State pollingThreadState = pollingThread.getState(); if (pollingThreadState != Thread.State.RUNNABLE) { System.err.format("polling thread state: %s\n", pollingThreadState); System.exit(1); } doneLatch.countDown(); pollingThread.join(); } } {code} Here, I choose four classes, which both have static initialization block. The above code fails quite often, roughly rate 10%, in my local
[jira] [Comment Edited] (FLINK-20082) Cannot start Flink application due to "cannot assign instance of java.lang.invoke.SerializedLambda to type scala.Function1
[ https://issues.apache.org/jira/browse/FLINK-20082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230308#comment-17230308 ] ChangZhuo Chen (陳昌倬) edited comment on FLINK-20082 at 11/12/20, 1:02 AM: - [~gaoyunhaii], * The full call stack is [^Flink-1.11.2_Scala-2.12.log]. The Scala version is 2.12.11. * We try to use Scala 2.11, and we got another error [^Flink-1.11.2_Scala-2.11.log]. The Scala version is 2.11.12. * We add `-Dsun.io.serialization.extendedDebugInfo=true` to get better debug information for this issue. was (Author: czchen): [~gaoyunhaii], * The full call stack is [^Flink-1.11.2_Scala-2.11.log]. The Scala version shall be 2.11.12. * We are still try to identify which code causes the problem. Will update once we found the problem. > Cannot start Flink application due to "cannot assign instance of > java.lang.invoke.SerializedLambda to type scala.Function1 > -- > > Key: FLINK-20082 > URL: https://issues.apache.org/jira/browse/FLINK-20082 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala >Affects Versions: 1.11.2 > Environment: * Flink 1.11.2 > * Java 11 > * Scala 2.12.11 >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > Attachments: Flink-1.11.2_Scala-2.11.log, > Flink-1.11.2_Scala-2.12.log, image-20201110-060934.png > > > Hi, > * Our Flink application (Java 11 + Scala 2.12) has the following problem > when executing it. It cannot be run in Flink cluster. > * The problem is similar to > https://issues.apache.org/jira/browse/SPARK-25047, so maybe the same fix > shall be implemented in Flink? > !image-20201110-060934.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20082) Cannot start Flink application due to "cannot assign instance of java.lang.invoke.SerializedLambda to type scala.Function1
[ https://issues.apache.org/jira/browse/FLINK-20082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-20082: - Environment: * Flink 1.11.2 * Java 11 * Scala 2.12.11 was: * Flink 1.11.2 * Java 11 * Scala 2.11.12 > Cannot start Flink application due to "cannot assign instance of > java.lang.invoke.SerializedLambda to type scala.Function1 > -- > > Key: FLINK-20082 > URL: https://issues.apache.org/jira/browse/FLINK-20082 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala >Affects Versions: 1.11.2 > Environment: * Flink 1.11.2 > * Java 11 > * Scala 2.12.11 >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > Attachments: Flink-1.11.2_Scala-2.11.log, > Flink-1.11.2_Scala-2.12.log, image-20201110-060934.png > > > Hi, > * Our Flink application (Java 11 + Scala 2.12) has the following problem > when executing it. It cannot be run in Flink cluster. > * The problem is similar to > https://issues.apache.org/jira/browse/SPARK-25047, so maybe the same fix > shall be implemented in Flink? > !image-20201110-060934.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20082) Cannot start Flink application due to "cannot assign instance of java.lang.invoke.SerializedLambda to type scala.Function1
[ https://issues.apache.org/jira/browse/FLINK-20082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-20082: - Attachment: Flink-1.11.2_Scala-2.12.log > Cannot start Flink application due to "cannot assign instance of > java.lang.invoke.SerializedLambda to type scala.Function1 > -- > > Key: FLINK-20082 > URL: https://issues.apache.org/jira/browse/FLINK-20082 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala >Affects Versions: 1.11.2 > Environment: * Flink 1.11.2 > * Java 11 > * Scala 2.12.11 >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > Attachments: Flink-1.11.2_Scala-2.11.log, > Flink-1.11.2_Scala-2.12.log, image-20201110-060934.png > > > Hi, > * Our Flink application (Java 11 + Scala 2.12) has the following problem > when executing it. It cannot be run in Flink cluster. > * The problem is similar to > https://issues.apache.org/jira/browse/SPARK-25047, so maybe the same fix > shall be implemented in Flink? > !image-20201110-060934.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14040: [FLINK-20097][checkpointing] Race conditions in InputChannel.ChannelStatePersister
flinkbot edited a comment on pull request #14040: URL: https://github.com/apache/flink/pull/14040#issuecomment-725585398 ## CI report: * 690c631a452a67702d2191c5dc2b3146e7338322 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9480) * 5f1010916f552c9a4ae623a7154ccaaa2b1fdb76 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9483) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14042: [FLINK-19300][flink-core] Fix input stream read to prevent heap based timer loss
flinkbot edited a comment on pull request #14042: URL: https://github.com/apache/flink/pull/14042#issuecomment-725744439 ## CI report: * 789adc9519b48d6474d12b282d529ea0a024bfc9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9484) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20082) Cannot start Flink application due to "cannot assign instance of java.lang.invoke.SerializedLambda to type scala.Function1
[ https://issues.apache.org/jira/browse/FLINK-20082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230308#comment-17230308 ] ChangZhuo Chen (陳昌倬) commented on FLINK-20082: -- [~gaoyunhaii], * The full call stack is [^Flink-1.11.2_Scala-2.11.log]. The Scala version shall be 2.11.12. * We are still try to identify which code causes the problem. Will update once we found the problem. > Cannot start Flink application due to "cannot assign instance of > java.lang.invoke.SerializedLambda to type scala.Function1 > -- > > Key: FLINK-20082 > URL: https://issues.apache.org/jira/browse/FLINK-20082 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala >Affects Versions: 1.11.2 > Environment: * Flink 1.11.2 > * Java 11 > * Scala 2.11.12 >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > Attachments: Flink-1.11.2_Scala-2.11.log, image-20201110-060934.png > > > Hi, > * Our Flink application (Java 11 + Scala 2.12) has the following problem > when executing it. It cannot be run in Flink cluster. > * The problem is similar to > https://issues.apache.org/jira/browse/SPARK-25047, so maybe the same fix > shall be implemented in Flink? > !image-20201110-060934.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #14042: [FLINK-19300][flink-core] Fix input stream read to prevent heap based timer loss
flinkbot commented on pull request #14042: URL: https://github.com/apache/flink/pull/14042#issuecomment-725744439 ## CI report: * 789adc9519b48d6474d12b282d529ea0a024bfc9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20082) Cannot start Flink application due to "cannot assign instance of java.lang.invoke.SerializedLambda to type scala.Function1
[ https://issues.apache.org/jira/browse/FLINK-20082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-20082: - Attachment: Flink-1.11.2_Scala-2.11.log > Cannot start Flink application due to "cannot assign instance of > java.lang.invoke.SerializedLambda to type scala.Function1 > -- > > Key: FLINK-20082 > URL: https://issues.apache.org/jira/browse/FLINK-20082 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala >Affects Versions: 1.11.2 > Environment: * Flink 1.11.2 > * Java 11 > * Scala 2.12.11 >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > Attachments: Flink-1.11.2_Scala-2.11.log, image-20201110-060934.png > > > Hi, > * Our Flink application (Java 11 + Scala 2.12) has the following problem > when executing it. It cannot be run in Flink cluster. > * The problem is similar to > https://issues.apache.org/jira/browse/SPARK-25047, so maybe the same fix > shall be implemented in Flink? > !image-20201110-060934.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20082) Cannot start Flink application due to "cannot assign instance of java.lang.invoke.SerializedLambda to type scala.Function1
[ https://issues.apache.org/jira/browse/FLINK-20082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-20082: - Environment: * Flink 1.11.2 * Java 11 * Scala 2.11.12 was: * Flink 1.11.2 * Java 11 * Scala 2.12.11 > Cannot start Flink application due to "cannot assign instance of > java.lang.invoke.SerializedLambda to type scala.Function1 > -- > > Key: FLINK-20082 > URL: https://issues.apache.org/jira/browse/FLINK-20082 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala >Affects Versions: 1.11.2 > Environment: * Flink 1.11.2 > * Java 11 > * Scala 2.11.12 >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > Attachments: Flink-1.11.2_Scala-2.11.log, image-20201110-060934.png > > > Hi, > * Our Flink application (Java 11 + Scala 2.12) has the following problem > when executing it. It cannot be run in Flink cluster. > * The problem is similar to > https://issues.apache.org/jira/browse/SPARK-25047, so maybe the same fix > shall be implemented in Flink? > !image-20201110-060934.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230306#comment-17230306 ] Xiang Gao commented on FLINK-19300: --- Created a PR. Seems like in case of non-versioned payload, we would push back those read bytes. We might not know the correct number of bytes to push back if we use DataInputView.readFully() while catching EOF. Did something similar to DataInputView.readFully(), but keep the number of bytes so that we can push back when necessary. > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0 >Reporter: Xiang Gao >Assignee: Xiang Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #14042: [FLINK-19300][flink-core] Fix input stream read to prevent heap based timer loss
flinkbot commented on pull request #14042: URL: https://github.com/apache/flink/pull/14042#issuecomment-725741764 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 789adc9519b48d6474d12b282d529ea0a024bfc9 (Thu Nov 12 00:27:24 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19300: --- Labels: pull-request-available (was: ) > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0 >Reporter: Xiang Gao >Assignee: Xiang Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Acesine opened a new pull request #14042: [FLINK-19300][flink-core] Fix input stream read to prevent heap based timer loss
Acesine opened a new pull request #14042: URL: https://github.com/apache/flink/pull/14042 ## What is the purpose of the change This pull request fixes a timer loss issue when deserializing heap based timers, by fully reading input stream. Currently we depend on InputStream.read(byte[]) to read in VERSIONED_IDENTIFIER bytes, however the read method for java InputStream will not guarantee reading all expected number of bytes. This change is to fix this behavior by keep reading until we read all expected bytes or if EOF is reached. In case of non-versioned payload, only push back the number of bytes we read above. ## Brief change log - Attempt to read into versioned identifier byte array as much as possible from input stream. - In case of non-versioned input, only push back the bytes that were read. ## Verifying this change This change added tests and can be verified as follows: - Unit test is added to verify the behavior - Manually tested the restore behavior after the change and verified no timer loss after. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-20099) HeapStateBackend checkpoint error hidden under cryptic message
Nico Kruber created FLINK-20099: --- Summary: HeapStateBackend checkpoint error hidden under cryptic message Key: FLINK-20099 URL: https://issues.apache.org/jira/browse/FLINK-20099 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.11.2 Reporter: Nico Kruber Attachments: Screenshot_20201112_001331.png When the memory state back-end hits a certain size, it fails to permit checkpoints. Even though a very detailed exception is thrown at its source, this is neither logged nor shown in the UI: * Logs just contain: {code:java} 00:06:41.462 [jobmanager-future-thread-14] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 2 by task 8eb303cd3196310cb2671212f4ed013c of job c9b7a410bd3143864ca23ba89595d878 at 6a73bcf2-46b6-4735-a616-fdf09ff1471c @ localhost (dataPort=-1). {code} * UI: (also see the attached Screenshot_20201112_001331.png) {code:java} Failure Message: The job has failed. {code} -> this isn't even true: the job is still running fine! Debugging into {{PendingCheckpoint#abort()}} reveals that the causing exception is actually still in there but the detailed information from it is just never used. For reference, this is what is available there and should be logged or shown: {code:java} java.lang.Exception: Could not materialize checkpoint 2 for operator aggregates -> (Sink: sink-agg-365, Sink: sink-agg-180, Sink: sink-agg-45, Sink: sink-agg-30) (4/4). at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:191) at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:138) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=6122737 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:50) at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102) ... 3 more Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=6122737 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64) at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145) at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126) at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:199) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:476) ... 5 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14040: [FLINK-20097][checkpointing] Race conditions in InputChannel.ChannelStatePersister
flinkbot edited a comment on pull request #14040: URL: https://github.com/apache/flink/pull/14040#issuecomment-725585398 ## CI report: * d95bf0fdccd16306024e3d37b81f7704cd618dc4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9478) * 690c631a452a67702d2191c5dc2b3146e7338322 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9480) * 5f1010916f552c9a4ae623a7154ccaaa2b1fdb76 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9483) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-9300) Improve error message when in-memory state is too large
[ https://issues.apache.org/jira/browse/FLINK-9300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber resolved FLINK-9300. Resolution: Cannot Reproduce Closing due to inactivity. > Improve error message when in-memory state is too large > --- > > Key: FLINK-9300 > URL: https://issues.apache.org/jira/browse/FLINK-9300 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.4.2 >Reporter: Kenneth William Krugler >Priority: Minor > > Currently in the {{MemCheckpointStreamFactory.checkSize()}} method, it can > throw an {{IOException}} via: > {code:java} > throw new IOException( > "Size of the state is larger than the maximum permitted memory-backed state. > Size=" > + size + " , maxSize=" + maxSize > + " . Consider using a different state backend, like the File System State > backend.");{code} > But this will happen even if you’re using the File System State backend. > This came up here: > [https://stackoverflow.com/questions/50149005/ioexception-size-of-the-state-is-larger-than-the-maximum-permitted-memory-backe] > We could change the message to be: > {quote}Please consider increasing the maximum permitted memory size, > increasing the task manager parallelism, or using a non-memory-based state > backend such as RocksDB. > {quote} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14040: [FLINK-20097][checkpointing] Race conditions in InputChannel.ChannelStatePersister
flinkbot edited a comment on pull request #14040: URL: https://github.com/apache/flink/pull/14040#issuecomment-725585398 ## CI report: * d95bf0fdccd16306024e3d37b81f7704cd618dc4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9478) * 690c631a452a67702d2191c5dc2b3146e7338322 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9480) * 5f1010916f552c9a4ae623a7154ccaaa2b1fdb76 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] gvauvert opened a new pull request #173: Fix some typos in Java code in documentation
gvauvert opened a new pull request #173: URL: https://github.com/apache/flink-statefun/pull/173 Hi, I propose to fix obvious typos in Java code in documentation. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14028: [FLINK-20020][client] Make UnsuccessfulExecutionException part of the JobClient.getJobExecutionResult() contract
flinkbot edited a comment on pull request #14028: URL: https://github.com/apache/flink/pull/14028#issuecomment-725222918 ## CI report: * 7eeaee3de15449732a05aa075b10cbbf9e152c2a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9477) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-20097: -- Priority: Critical (was: Major) > Race conditions in InputChannel.ChannelStatePersister > - > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14037: Revert "[FLINK-20033] Ensure that stopping a JobMaster will suspend the running job"
flinkbot edited a comment on pull request #14037: URL: https://github.com/apache/flink/pull/14037#issuecomment-725562164 ## CI report: * 9e33b284267c603bbd75fd933b2a1fffd1a4a748 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9475) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230255#comment-17230255 ] Roman Khachatryan commented on FLINK-20097: --- Please note, that even with a single checkpoint in flight, there can be a lot of older outdated barrier in the stream. This can happen if high backpressure and/or low timeout (that's why I setCheckpointTimeout to 10, plus minPause, plus interval). > Race conditions in InputChannel.ChannelStatePersister > - > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230241#comment-17230241 ] Arvid Heise commented on FLINK-20097: - > In AC mode, it seems to always pass (didn't check all the combinations). AC mode never touches ChannelPersister, so I'm not suprised. Especially {{stopPersisting}} is never called by {{AlignedController}}. > Race conditions in InputChannel.ChannelStatePersister > - > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230236#comment-17230236 ] Arvid Heise commented on FLINK-20097: - ``` Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) ``` This looks very much like FLINK-20030. > Race conditions in InputChannel.ChannelStatePersister > - > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230236#comment-17230236 ] Arvid Heise edited comment on FLINK-20097 at 11/11/20, 9:25 PM: {noformat} Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) {noformat} This looks very much like FLINK-20030. was (Author: aheise): ``` Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) ``` This looks very much like FLINK-20030. > Race conditions in InputChannel.ChannelStatePersister > - > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230235#comment-17230235 ] Arvid Heise commented on FLINK-20097: - You are right that the code currently is not behaving well when you have two concurrent UC. I actually wanted to make it possible to process concurrent checkpoint in all 1.12 code, but it didn't make it through the review. I'm assuming that as long as we don't know if we want to support concurrent UC, the code was not needed. I'm unsure why stuff is failing with one checkpoint. It could be related to cancellation, such that actually two UC barriers arrive at some task instead. > Race conditions in InputChannel.ChannelStatePersister > - > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14038: [BP-1.11] Revert "[FLINK-20033] Ensure that stopping a JobMaster will suspend the running job"
flinkbot edited a comment on pull request #14038: URL: https://github.com/apache/flink/pull/14038#issuecomment-725562511 ## CI report: * 20a93f5e041c8414e5cf1b91d73b1d76b25e42ad Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9476) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14040: [FLINK-20097][checkpointing] Race conditions in InputChannel.ChannelStatePersister (confirm)
flinkbot edited a comment on pull request #14040: URL: https://github.com/apache/flink/pull/14040#issuecomment-725585398 ## CI report: * d95bf0fdccd16306024e3d37b81f7704cd618dc4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9478) * 690c631a452a67702d2191c5dc2b3146e7338322 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9480) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14040: [FLINK-20097][checkpointing] Race conditions in InputChannel.ChannelStatePersister (confirm)
flinkbot edited a comment on pull request #14040: URL: https://github.com/apache/flink/pull/14040#issuecomment-725585398 ## CI report: * d95bf0fdccd16306024e3d37b81f7704cd618dc4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9478) * 690c631a452a67702d2191c5dc2b3146e7338322 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-20097: -- Summary: Race conditions in InputChannel.ChannelStatePersister (was: Race conditions in InputChannel.ChannelStatePersister (confirm)) > Race conditions in InputChannel.ChannelStatePersister > - > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister (confirm)
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230202#comment-17230202 ] Roman Khachatryan commented on FLINK-20097: --- Fixing the original issues with ChannelStatePersister (as described) seems to resolve the failures. > Race conditions in InputChannel.ChannelStatePersister (confirm) > --- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister (confirm)
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230195#comment-17230195 ] Roman Khachatryan commented on FLINK-20097: --- Just to clarify, I used shouldPerformUnalignedCheckpointOnParallelRemoteChannel (parallelism 5, slotsPerTaskManager 1, slotSharing false). In AC mode, it seems to always pass (didn't check all the combinations). > Race conditions in InputChannel.ChannelStatePersister (confirm) > --- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister (confirm)
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230183#comment-17230183 ] Roman Khachatryan edited comment on FLINK-20097 at 11/11/20, 8:05 PM: -- OK, the tests fail with the increased checkpointing frequency plus some other settings updated. I don't know whether the root cause is what I described above. Changes: {code:java} long minCheckpoints = 100; env.enableCheckpointing(10); env.getCheckpointConfig().setAlignmentTimeout(0); env.getCheckpointConfig().setCheckpointTimeout(10); // minimum allowed env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); // can be lower because of the high TolerableCheckpointFailureNumber // collector.checkThat(result.getAccumulatorResult(NUM_FAILURES), equalTo(EXPECTED_FAILURES)); sink: if (random.nextInt(100) == 42) { Thread.sleep(7); } {code} Failures: shouldPerformUnalignedCheckpointOnParallelRemoteChannel: NUM_DUPLICATES > 0 (2 out of 70 runs) With CheckpointTimeout 5, interval 1, sleep 1 I also get: two out-of-order and one corrupted state on recovery: {code} Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readByte(DataInputDeserializer.java:134) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:199) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:574) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:538) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) {code} Corrupted state is very similar to what I observed in FLINK-19681. was (Author: roman_khachatryan): OK, the tests fail with the increased checkpointing frequency plus some other settings updated. I don't know whether the root cause is what I described above. Changes: {code:java} long minCheckpoints = 100; env.enableCheckpointing(10); env.getCheckpointConfig().setAlignmentTimeout(0); env.getCheckpointConfig().setCheckpointTimeout(10); // minimum allowed env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); // can be lower because of the high TolerableCheckpointFailureNumber // collector.checkThat(result.getAccumulatorResult(NUM_FAILURES), equalTo(EXPECTED_FAILURES)); sink: if (random.nextInt(100) == 42) { Thread.sleep(7); } {code} Failures: shouldPerformUnalignedCheckpointOnParallelRemoteChannel: NUM_DUPLICATES > 0 (2 out of 70 runs) I've also observed recovery problems (corrupted stream) similar to FLINK-19681 likely caused by corrupted InputChannel state. > Race conditions in InputChannel.ChannelStatePersister (confirm) > --- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer,
[GitHub] [flink] flinkbot edited a comment on pull request #14041: [FLINK-20096][docs] Add clean up PyFlink docs
flinkbot edited a comment on pull request #14041: URL: https://github.com/apache/flink/pull/14041#issuecomment-725623844 ## CI report: * 8db7bc182e4c999ac23507ab6c691c09c6fa265b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9479) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14035: [FLINK-19340][table-runtime-blink] Support NestedRowData in RowDataSerializer#copy
flinkbot edited a comment on pull request #14035: URL: https://github.com/apache/flink/pull/14035#issuecomment-725492251 ## CI report: * fa239b08ec2237f4c47fd9871b37017a177fef9a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9471) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister (confirm)
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230189#comment-17230189 ] Roman Khachatryan edited comment on FLINK-20097 at 11/11/20, 7:58 PM: -- And with {code} env.enableCheckpointing(1); env.getCheckpointConfig().setCheckpointTimeout(2); // had to disable check Thread.sleep(1); // in sink 42/100 {code} I got one {code} Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getInflightBuffers(RemoteInputChannel.java:537) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkpointStarted(RemoteInputChannel.java:509) {code} This is probably FLINK-20030? was (Author: roman_khachatryan): And with {code} env.enableCheckpointing(1); env.getCheckpointConfig().setCheckpointTimeout(2); Thread.sleep(1); // in sink 42/100 {code} I got one {code} Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getInflightBuffers(RemoteInputChannel.java:537) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkpointStarted(RemoteInputChannel.java:509) {code} > Race conditions in InputChannel.ChannelStatePersister (confirm) > --- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister (confirm)
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230189#comment-17230189 ] Roman Khachatryan edited comment on FLINK-20097 at 11/11/20, 7:55 PM: -- And with {code} env.enableCheckpointing(1); env.getCheckpointConfig().setCheckpointTimeout(2); Thread.sleep(1); // in sink 42/100 {code} I got one {code} Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getInflightBuffers(RemoteInputChannel.java:537) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkpointStarted(RemoteInputChannel.java:509) {code} was (Author: roman_khachatryan): And with {code} env.getCheckpointConfig().setCheckpointTimeout(2); env.enableCheckpointing(1); Thread.sleep(1); // in sink 42/100 {code} I got one {code} Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getInflightBuffers(RemoteInputChannel.java:537) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkpointStarted(RemoteInputChannel.java:509) {code} > Race conditions in InputChannel.ChannelStatePersister (confirm) > --- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister (confirm)
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230189#comment-17230189 ] Roman Khachatryan commented on FLINK-20097: --- And with {code} env.getCheckpointConfig().setCheckpointTimeout(2); env.enableCheckpointing(1); Thread.sleep(1); // in sink 42/100 {code} I got one {code} Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getInflightBuffers(RemoteInputChannel.java:537) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkpointStarted(RemoteInputChannel.java:509) {code} > Race conditions in InputChannel.ChannelStatePersister (confirm) > --- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #14041: [FLINK-20096][docs] Add clean up PyFlink docs
flinkbot commented on pull request #14041: URL: https://github.com/apache/flink/pull/14041#issuecomment-725623844 ## CI report: * 8db7bc182e4c999ac23507ab6c691c09c6fa265b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister (confirm)
[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230183#comment-17230183 ] Roman Khachatryan commented on FLINK-20097: --- OK, the tests fail with the increased checkpointing frequency plus some other settings updated. I don't know whether the root cause is what I described above. Changes: {code:java} long minCheckpoints = 100; env.enableCheckpointing(10); env.getCheckpointConfig().setAlignmentTimeout(0); env.getCheckpointConfig().setCheckpointTimeout(10); // minimum allowed env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); // can be lower because of the high TolerableCheckpointFailureNumber // collector.checkThat(result.getAccumulatorResult(NUM_FAILURES), equalTo(EXPECTED_FAILURES)); sink: if (random.nextInt(100) == 42) { Thread.sleep(7); } {code} Failures: shouldPerformUnalignedCheckpointOnParallelRemoteChannel: NUM_DUPLICATES > 0 (2 out of 70 runs) I've also observed recovery problems (corrupted stream) similar to FLINK-19681 likely caused by corrupted InputChannel state. > Race conditions in InputChannel.ChannelStatePersister (confirm) > --- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network >Affects Versions: 1.12.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #14041: [FLINK-20096][docs] Add clean up PyFlink docs
flinkbot commented on pull request #14041: URL: https://github.com/apache/flink/pull/14041#issuecomment-725616287 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 5eaf23ae2c3bf68e1fba047b77f7671fe68b9e41 (Wed Nov 11 19:28:44 UTC 2020) ✅no warnings Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on pull request #14041: [FLINK-20096][docs] Add clean up PyFlink docs
sjwiesman commented on pull request #14041: URL: https://github.com/apache/flink/pull/14041#issuecomment-725615134 cc @morsapaes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14039: [BP-1.10] Revert "[FLINK-20033] Ensure that stopping a JobMaster will suspend the running job"
flinkbot edited a comment on pull request #14039: URL: https://github.com/apache/flink/pull/14039#issuecomment-725563191 ## CI report: * f63c90a10178063598a6b7c91f6325a8f191479a Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/200081234) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20096) Clean up PyFlink documentation
[ https://issues.apache.org/jira/browse/FLINK-20096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-20096: --- Labels: pull-request-available (was: ) > Clean up PyFlink documentation > -- > > Key: FLINK-20096 > URL: https://issues.apache.org/jira/browse/FLINK-20096 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] sjwiesman opened a new pull request #14041: [FLINK-20096][docs] Add clean up PyFlink docs
sjwiesman opened a new pull request #14041: URL: https://github.com/apache/flink/pull/14041 ## What is the purpose of the change This cleans up the PyFlink documentation. Mostly fixing English grammar and adding additional cross-links. I've also added an overview page to the section. ![Screen Shot 2020-11-11 at 1 22 22 PM](https://user-images.githubusercontent.com/1891970/98855282-5d229400-2421-11eb-965a-de49d7298508.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #14003: [FLINK-19527][Doc]Flink SQL Getting Started
sjwiesman commented on a change in pull request #14003: URL: https://github.com/apache/flink/pull/14003#discussion_r521573057 ## File path: docs/dev/table/sql/gettingStarted.md ## @@ -0,0 +1,200 @@ +--- +title: "Getting Started - Flink SQL" +nav-parent_id: sql +nav-pos: 0 +--- + + + +* This will be replaced by the TOC +{:toc} + +Flink SQL enables SQL developers to design and develop the batch or streaming application without writing the Java, Scala, or any other programming language code. It provides a unified API for both batch and streaming APIs. As a user, you can perform powerful transformations. Flink’s SQL support is based on [Apache Calcite](https://calcite.apache.org/) which implements the SQL standard. Review comment: Generally like to focus on streaming over batch. ```suggestion Flink SQL enables SQL developers to design and develop the batch or streaming application without writing the Java, Scala, or any other programming language code. It provides a unified API for both stream and batch processing. As a user, you can perform powerful transformations. Flink’s SQL support is based on [Apache Calcite](https://calcite.apache.org/) which implements the SQL standard. ``` ## File path: docs/dev/table/sql/gettingStarted.md ## @@ -0,0 +1,200 @@ +--- +title: "Getting Started - Flink SQL" +nav-parent_id: sql +nav-pos: 0 +--- + + + +* This will be replaced by the TOC +{:toc} + +Flink SQL enables SQL developers to design and develop the batch or streaming application without writing the Java, Scala, or any other programming language code. It provides a unified API for both batch and streaming APIs. As a user, you can perform powerful transformations. Flink’s SQL support is based on [Apache Calcite](https://calcite.apache.org/) which implements the SQL standard. + +In addition to the SQL API, Flink also has Table API as well with similar semantics as SQL. Table API is a Language integrated API, where we use a specific programming language to write the queries or call the API. For example, we create the table environment, get a table object, and apply different methods that return table API objects. It supports different languages e.g. Java, Scala, Python. + +Flink SQL and Table API are just two ways to write queries that use the same API underneath. It wouldn’t be wrong if we say Table API is a wrapper on top of the streaming API. SQL API is a more descriptive way of writing queries using well-known SQL standards where we usually have ‘select * from Table’ pattern. Table API query starts with from clause, followed by joins and where clause, and then finally projection or select at the last. SQL API is easy to learn and almost everyone knows it already. All the queries are optimized for efficient execution. We will focus on the Flink SQL API, while you can read more about Table API [here]({{ site.baseurl }}/dev/table/). + +### Pre-requisites +You only need to have basic knowledge of SQL to follow along. You will not need to write Java or Scala code or use an IDE. + +### Installation +There are various ways to install the Flink. You can download the source code, compile it, and run it. Another option is to have it running inside the container. Probably the easiest one is to download the binaries and run it locally for experimentation. We assume local installation for the rest of the tutorial. You can start a local cluster using the following command from the installation folder Review comment: > There are various ways to install the Flink Its not *the flink* just Flink. "There are various ways to install Flink". > You can download the source code, compile it, and run it. Compiling flink is difficult, I'd rather not mention that in a getting started guide. If we are guiding users towards local installation, how about a link to the downloads page. ## File path: docs/dev/table/sql/gettingStarted.md ## @@ -0,0 +1,200 @@ +--- +title: "Getting Started - Flink SQL" +nav-parent_id: sql +nav-pos: 0 +--- + + + +* This will be replaced by the TOC +{:toc} + +Flink SQL enables SQL developers to design and develop the batch or streaming application without writing the Java, Scala, or any other programming language code. It provides a unified API for both batch and streaming APIs. As a user, you can perform powerful transformations. Flink’s SQL support is based on [Apache Calcite](https://calcite.apache.org/) which implements the SQL standard. + +In addition to the SQL API, Flink also has Table API as well with similar semantics as SQL. Table API is a Language integrated API, where we use a specific programming language to write the queries or call the API. For example, we create the table environment, get a table object, and apply different methods that return table API objects. It supports different languages e.g. Java, Scala, Python. + +Flink SQL and Table API
[GitHub] [flink] sjwiesman commented on a change in pull request #14036: [FLINK-20093] Create a download page for all optional sql client components
sjwiesman commented on a change in pull request #14036: URL: https://github.com/apache/flink/pull/14036#discussion_r521571620 ## File path: docs/_data/sql-connectors.yml ## @@ -0,0 +1,86 @@ +avro: Review comment: Could you also add a comment explaining how to add a new format or connector. I want it to be really self-evident how to maintain this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org