[jira] [Commented] (FLINK-22387) UpsertKafkaTableITCase hangs when setting up kafka
[ https://issues.apache.org/jira/browse/FLINK-22387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335158#comment-17335158 ] Guowei Ma commented on FLINK-22387: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17371=logs=72d4811f-9f0d-5fd0-014a-0bc26b72b642=c1d93a6a-ba91-515d-3196-2ee8019fbda7=6899 > UpsertKafkaTableITCase hangs when setting up kafka > -- > > Key: FLINK-22387 > URL: https://issues.apache.org/jira/browse/FLINK-22387 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.13.0 >Reporter: Dawid Wysakowicz >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16901=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6932 > {code} > 2021-04-20T20:01:32.2276988Z Apr 20 20:01:32 "main" #1 prio=5 os_prio=0 > tid=0x7fe87400b000 nid=0x4028 runnable [0x7fe87df22000] > 2021-04-20T20:01:32.2277666Z Apr 20 20:01:32java.lang.Thread.State: > RUNNABLE > 2021-04-20T20:01:32.2278338Z Apr 20 20:01:32 at > org.testcontainers.shaded.okio.Buffer.getByte(Buffer.java:312) > 2021-04-20T20:01:32.2279325Z Apr 20 20:01:32 at > org.testcontainers.shaded.okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:310) > 2021-04-20T20:01:32.2280656Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.readChunkSize(Http1ExchangeCodec.java:492) > 2021-04-20T20:01:32.2281603Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.read(Http1ExchangeCodec.java:471) > 2021-04-20T20:01:32.2282163Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.internal.Util.skipAll(Util.java:204) > 2021-04-20T20:01:32.2282870Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.internal.Util.discard(Util.java:186) > 2021-04-20T20:01:32.2283494Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.close(Http1ExchangeCodec.java:511) > 2021-04-20T20:01:32.2284460Z Apr 20 20:01:32 at > org.testcontainers.shaded.okio.ForwardingSource.close(ForwardingSource.java:43) > 2021-04-20T20:01:32.2285183Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.internal.connection.Exchange$ResponseBodySource.close(Exchange.java:313) > 2021-04-20T20:01:32.2285756Z Apr 20 20:01:32 at > org.testcontainers.shaded.okio.RealBufferedSource.close(RealBufferedSource.java:476) > 2021-04-20T20:01:32.2286287Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.internal.Util.closeQuietly(Util.java:139) > 2021-04-20T20:01:32.2286795Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.ResponseBody.close(ResponseBody.java:192) > 2021-04-20T20:01:32.2287270Z Apr 20 20:01:32 at > org.testcontainers.shaded.okhttp3.Response.close(Response.java:290) > 2021-04-20T20:01:32.2287913Z Apr 20 20:01:32 at > org.testcontainers.shaded.com.github.dockerjava.okhttp.OkDockerHttpClient$OkResponse.close(OkDockerHttpClient.java:285) > 2021-04-20T20:01:32.2288606Z Apr 20 20:01:32 at > org.testcontainers.shaded.com.github.dockerjava.core.DefaultInvocationBuilder.lambda$null$0(DefaultInvocationBuilder.java:272) > 2021-04-20T20:01:32.2289295Z Apr 20 20:01:32 at > org.testcontainers.shaded.com.github.dockerjava.core.DefaultInvocationBuilder$$Lambda$340/2058508175.close(Unknown > Source) > 2021-04-20T20:01:32.2289886Z Apr 20 20:01:32 at > com.github.dockerjava.api.async.ResultCallbackTemplate.close(ResultCallbackTemplate.java:77) > 2021-04-20T20:01:32.2290567Z Apr 20 20:01:32 at > org.testcontainers.utility.ResourceReaper.start(ResourceReaper.java:202) > 2021-04-20T20:01:32.2291051Z Apr 20 20:01:32 at > org.testcontainers.DockerClientFactory.client(DockerClientFactory.java:205) > 2021-04-20T20:01:32.2291879Z Apr 20 20:01:32 - locked <0xe9cd50f8> > (a [Ljava.lang.Object;) > 2021-04-20T20:01:32.2292313Z Apr 20 20:01:32 at > org.testcontainers.LazyDockerClient.getDockerClient(LazyDockerClient.java:14) > 2021-04-20T20:01:32.2292870Z Apr 20 20:01:32 at > org.testcontainers.LazyDockerClient.authConfig(LazyDockerClient.java:12) > 2021-04-20T20:01:32.2293383Z Apr 20 20:01:32 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:310) > 2021-04-20T20:01:32.2293890Z Apr 20 20:01:32 at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1029) > 2021-04-20T20:01:32.2294578Z Apr 20 20:01:32 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > 2021-04-20T20:01:32.2295157Z Apr 20 20:01:32 at >
[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
flinkbot edited a comment on pull request #15797: URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728 ## CI report: * 32b72459e66decd79269d3718f1ee40c3d765775 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17372) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-22285) YARNHighAvailabilityITCase hangs
[ https://issues.apache.org/jira/browse/FLINK-22285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335141#comment-17335141 ] Guowei Ma edited comment on FLINK-22285 at 4/29/21, 5:44 AM: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17356=logs=fc5181b0-e452-5c8f-68de-1097947f6483=6b04ca5f-0b52-511d-19c9-52bf0d9fbdfa=26030 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17353=logs=fc5181b0-e452-5c8f-68de-1097947f6483=6b04ca5f-0b52-511d-19c9-52bf0d9fbdfa=26429 was (Author: maguowei): https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17356=logs=fc5181b0-e452-5c8f-68de-1097947f6483=6b04ca5f-0b52-511d-19c9-52bf0d9fbdfa=26030 > YARNHighAvailabilityITCase hangs > > > Key: FLINK-22285 > URL: https://issues.apache.org/jira/browse/FLINK-22285 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Runtime / Coordination >Affects Versions: 1.11.3, 1.12.2 >Reporter: Guowei Ma >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16568=logs=8fd975ef-f478-511d-4997-6f15fe8a1fd3=ac0fa443-5d45-5a6b-3597-0310ecc1d2ab=31437 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22499) JDBC sink table-api support "sink.parallelism" ?
[ https://issues.apache.org/jira/browse/FLINK-22499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335142#comment-17335142 ] Shengkai Fang commented on FLINK-22499: --- This is the feature supported in FLINK-19942. You may cp the feature to your branch. > JDBC sink table-api support "sink.parallelism" ? > -- > > Key: FLINK-22499 > URL: https://issues.apache.org/jira/browse/FLINK-22499 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Affects Versions: 1.12.1 >Reporter: ranqiqiang >Priority: Major > > I receive message from kafka, then sink to mysql/tidb across the sql > table-api, > How can I change the sink parallelism ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22285) YARNHighAvailabilityITCase hangs
[ https://issues.apache.org/jira/browse/FLINK-22285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-22285: -- Affects Version/s: 1.11.3 > YARNHighAvailabilityITCase hangs > > > Key: FLINK-22285 > URL: https://issues.apache.org/jira/browse/FLINK-22285 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Runtime / Coordination >Affects Versions: 1.11.3, 1.12.2 >Reporter: Guowei Ma >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16568=logs=8fd975ef-f478-511d-4997-6f15fe8a1fd3=ac0fa443-5d45-5a6b-3597-0310ecc1d2ab=31437 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22285) YARNHighAvailabilityITCase hangs
[ https://issues.apache.org/jira/browse/FLINK-22285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335141#comment-17335141 ] Guowei Ma commented on FLINK-22285: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17356=logs=fc5181b0-e452-5c8f-68de-1097947f6483=6b04ca5f-0b52-511d-19c9-52bf0d9fbdfa=26030 > YARNHighAvailabilityITCase hangs > > > Key: FLINK-22285 > URL: https://issues.apache.org/jira/browse/FLINK-22285 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Runtime / Coordination >Affects Versions: 1.12.2 >Reporter: Guowei Ma >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16568=logs=8fd975ef-f478-511d-4997-6f15fe8a1fd3=ac0fa443-5d45-5a6b-3597-0310ecc1d2ab=31437 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22520) KafkaSourceLegacyITCase.testMultipleSourcesOnePartition hangs
Guowei Ma created FLINK-22520: - Summary: KafkaSourceLegacyITCase.testMultipleSourcesOnePartition hangs Key: FLINK-22520 URL: https://issues.apache.org/jira/browse/FLINK-22520 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0 Reporter: Guowei Ma There is no any error messages. https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17363=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=42023 {code:java} "main" #1 prio=5 os_prio=0 tid=0x7f4d3400b000 nid=0x203f waiting on condition [0x7f4d3be2e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xa68f3b68> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:49) at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:1112) at org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase.testMultipleSourcesOnePartition(KafkaSourceLegacyITCase.java:87) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22503) getting error on select query in flink sql
[ https://issues.apache.org/jira/browse/FLINK-22503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335137#comment-17335137 ] Shengkai Fang commented on FLINK-22503: --- Hi. This is the expected error. Please call `{{SET execution.result-mode=tableau;}}` before SELECT in batch more. Please refer to [1] for more details. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html#running-sql-queries > getting error on select query in flink sql > --- > > Key: FLINK-22503 > URL: https://issues.apache.org/jira/browse/FLINK-22503 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.12.2 > Environment: Hi Team, > I added source table *orders* with filesystem connector using csv file. when > i am running the select * from orders, its throwing error. even i chaged the > mode also not working. > !image-2021-04-28-13-33-36-784.png! > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.client.gateway.SqlExecutionException: Results of batch > queries can only be served in table or tableau mode > 2) I used sink as Elasticsearch , but not able to see the data in > elasticsearch index.. Can you please check and help me two issues > !image-2021-04-28-13-40-42-375.png! > !image-2021-04-28-13-43-00-455.png! >Reporter: Bhagi >Priority: Major > Attachments: image-2021-04-28-13-33-36-784.png, > image-2021-04-28-13-40-42-375.png, image-2021-04-28-13-43-00-455.png > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22503) getting error on select query in flink sql
[ https://issues.apache.org/jira/browse/FLINK-22503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang closed FLINK-22503. - Resolution: Not A Bug > getting error on select query in flink sql > --- > > Key: FLINK-22503 > URL: https://issues.apache.org/jira/browse/FLINK-22503 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.12.2 > Environment: Hi Team, > I added source table *orders* with filesystem connector using csv file. when > i am running the select * from orders, its throwing error. even i chaged the > mode also not working. > !image-2021-04-28-13-33-36-784.png! > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.client.gateway.SqlExecutionException: Results of batch > queries can only be served in table or tableau mode > 2) I used sink as Elasticsearch , but not able to see the data in > elasticsearch index.. Can you please check and help me two issues > !image-2021-04-28-13-40-42-375.png! > !image-2021-04-28-13-43-00-455.png! >Reporter: Bhagi >Priority: Major > Attachments: image-2021-04-28-13-33-36-784.png, > image-2021-04-28-13-40-42-375.png, image-2021-04-28-13-43-00-455.png > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22503) getting error on select query in flink sql
[ https://issues.apache.org/jira/browse/FLINK-22503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang updated FLINK-22503: -- Component/s: (was: Table SQL / API) Table SQL / Client > getting error on select query in flink sql > --- > > Key: FLINK-22503 > URL: https://issues.apache.org/jira/browse/FLINK-22503 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.12.2 > Environment: Hi Team, > I added source table *orders* with filesystem connector using csv file. when > i am running the select * from orders, its throwing error. even i chaged the > mode also not working. > !image-2021-04-28-13-33-36-784.png! > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.client.gateway.SqlExecutionException: Results of batch > queries can only be served in table or tableau mode > 2) I used sink as Elasticsearch , but not able to see the data in > elasticsearch index.. Can you please check and help me two issues > !image-2021-04-28-13-40-42-375.png! > !image-2021-04-28-13-43-00-455.png! >Reporter: Bhagi >Priority: Major > Attachments: image-2021-04-28-13-33-36-784.png, > image-2021-04-28-13-40-42-375.png, image-2021-04-28-13-43-00-455.png > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] YikSanChan closed pull request #15404: [hotfix][docs] Remove redundant import
YikSanChan closed pull request #15404: URL: https://github.com/apache/flink/pull/15404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-20086) Add documentation for the open method of UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-20086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-20086. --- Fix Version/s: (was: 1.11.4) (was: 1.13.0) 1.13.1 1.14.0 Resolution: Fixed Merged to - master via c593209b664af0d112b22c386266476f3a2ee750 - release-1.13 via 8ffefdaed5f2ac22a9b720dc5d0293b7be88a0d4 > Add documentation for the open method of UserDefinedFunction > > > Key: FLINK-20086 > URL: https://issues.apache.org/jira/browse/FLINK-20086 > Project: Flink > Issue Type: Improvement > Components: API / Python, Documentation >Reporter: Dian Fu >Assignee: Yik San Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0, 1.13.1 > > > According to the questions asked by PyFlink users so far, many users are not > aware that there is a open method in UserDefinedFunction where they could > perform initialization work. This method is especially useful for ML users > where they could perform ML mode initialization. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-20086) Add documentation for the open method of UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-20086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-20086: --- Assignee: Yik San Chan > Add documentation for the open method of UserDefinedFunction > > > Key: FLINK-20086 > URL: https://issues.apache.org/jira/browse/FLINK-20086 > Project: Flink > Issue Type: Improvement > Components: API / Python, Documentation >Reporter: Dian Fu >Assignee: Yik San Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.11.4, 1.13.0 > > > According to the questions asked by PyFlink users so far, many users are not > aware that there is a open method in UserDefinedFunction where they could > perform initialization work. This method is especially useful for ML users > where they could perform ML mode initialization. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu closed pull request #15795: [FLINK-20086][docs] Override UserDefinedFunction open to load resources
dianfu closed pull request #15795: URL: https://github.com/apache/flink/pull/15795 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-17826) Add missing custom query support on new jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-17826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335131#comment-17335131 ] Leonard Xu edited comment on FLINK-17826 at 4/29/21, 4:02 AM: -- Hi, [~f.pompermaier] The concern about the custom query is that (1) After add a custom query, the JDBC table should be a view of JDBC (e.g select * from db1.A join db1.B) instead of a table from semantics, the 'table-name' option is useless in this case. (2) The table may can only be used as source table and can not as sink table for example above case. base on these, I tend to do not add the unclear option. But If you're interesting this issue, you can take over and I can help review and merge. We can add some specification for (1) and add more checks for (2) was (Author: leonard xu): Hi, [~f.pompermaier] The concern about the custom query is that (1) After add a custom query, the JDBC table should be a view of JDBC (e.g select * from db1.A join db1.B) instead of a table from semantics. (2) The table may can only be used as source table and can not as sink table for example above case. base on these, I tend to do not add the unclear option. But If you're interesting this issue, you can take over and I can help review and merge. We can add some specification for (1) and add more checks for (2) > Add missing custom query support on new jdbc connector > -- > > Key: FLINK-17826 > URL: https://issues.apache.org/jira/browse/FLINK-17826 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Reporter: Jark Wu >Assignee: Leonard Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > In FLINK-17361, we added custom query on JDBC tables, but missing to add the > same ability on new jdbc connector (i.e. > {{JdbcDynamicTableSourceSinkFactory}}). > In the new jdbc connector, maybe we should call it {{scan.query}} to keep > consistent with other scan options, besides we need to make {{"table-name"}} > optional, but add validation that "table-name" and "scan.query" shouldn't > both be empty, and "table-name" must not be empty when used as sink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17826) Add missing custom query support on new jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-17826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335131#comment-17335131 ] Leonard Xu commented on FLINK-17826: Hi, [~f.pompermaier] The concern about the custom query is that (1) After add a custom query, the JDBC table should be a view of JDBC (e.g select * from db1.A join db1.B) instead of a table from semantics. (2) The table may can only be used as source table and can not as sink table for example above case. base on these, I tend to do not add the unclear option. But If you're interesting this issue, you can take over and I can help review and merge. We can add some specification for (1) and add more checks for (2) > Add missing custom query support on new jdbc connector > -- > > Key: FLINK-17826 > URL: https://issues.apache.org/jira/browse/FLINK-17826 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Reporter: Jark Wu >Assignee: Leonard Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > In FLINK-17361, we added custom query on JDBC tables, but missing to add the > same ability on new jdbc connector (i.e. > {{JdbcDynamicTableSourceSinkFactory}}). > In the new jdbc connector, maybe we should call it {{scan.query}} to keep > consistent with other scan options, besides we need to make {{"table-name"}} > optional, but add validation that "table-name" and "scan.query" shouldn't > both be empty, and "table-name" must not be empty when used as sink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22519) Have python-archives also take tar.gz
[ https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-22519: --- Assignee: Yik San Chan > Have python-archives also take tar.gz > - > > Key: FLINK-22519 > URL: https://issues.apache.org/jira/browse/FLINK-22519 > Project: Flink > Issue Type: New Feature > Components: API / Python >Reporter: Yik San Chan >Assignee: Yik San Chan >Priority: Major > > [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] > currently only takes zip. > In our use case, we want to package the whole conda environment into > python-archives, similar to how the > [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] > suggest about using venv (Python virtual environment). As we use PyFlink for > ML, there are inevitably a few large dependencies (tensorflow, torch, > pyarrow), as well as a lot of small dependencies. > This pattern is not friendly for zip. According to the > [post|https://superuser.com/a/173825], zip compresses each file > independently, and it is not performing good when dealing with a lot of small > files. On the other hand, tar simply bundles all files into a tarball, then > we can apply gzip to the whole tarball to achieve smaller size. This may > explain why the official packaging tool - conda pack - [conda > pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even > though zip is an option if we really want to. > To further prove the idea, I use my laptop and conda env to run an > experiment. My OS: macOS 10.15.7 > # Create an environment.yaml as well as a requirements.txt > # Run `conda env create -f environment.yaml` to create the conda env > # Run conda pack to produce a tar.gz > # Run conda pack faetflow-ml-env.zip to produce a zip > More details: > environment.yaml > {code:yaml} > name: featflow-ml-env > channels: > - pytorch > - conda-forge > - defaults > dependencies: > - python=3.7 > - pytorch=1.8.0 > - scikit-learn=0.23.2 > - pip > - pip: > - -r file:requirements.txt > {code} > requirements.txt > {code:yaml} > apache-flink==1.12.0 > deepctr-torch==0.2.6 > black==20.8b1 > confluent-kafka==1.6.0 > pytest==6.2.2 > testcontainers==3.4.0 > kafka-python==2.0.2 > {code} > > End result: the tar.gz is 854M, the zip is 1.6G > So, long story short, python-archives only support zip, while zip is not a > good choice for packaging ML libs. Let's change this by adding > python-archives tar.gz support. > Change will happen in this way: In ProcessPythonEnvironmentManager.java, > check the suffix. If tar.gz, unarchive it using gzip decompresser. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20951) IllegalArgumentException when reading Hive parquet table if condition not contain all partitioned fields
[ https://issues.apache.org/jira/browse/FLINK-20951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li updated FLINK-20951: --- Fix Version/s: (was: 1.12.4) (was: 1.13.0) > IllegalArgumentException when reading Hive parquet table if condition not > contain all partitioned fields > > > Key: FLINK-20951 > URL: https://issues.apache.org/jira/browse/FLINK-20951 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.0 > Environment: flink 1.12.0release-12 > sql-cli >Reporter: YUJIANBO >Assignee: Rui Li >Priority: Critical > Labels: stale-assigned > > The production hive table is partitioned by two fields:datekey and event > I have do this test by Flink-sql-cli:(Spark Sql All is OK) > (1)First: > SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid > = 'aa';(OK) > SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa'; > (Error) > (2)Second: > SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid > = 'bb';(OK) > SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb'; > (Error) > The exception is: > {code} > java.lang.RuntimeException: One or more fetchers have encountered exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: java.lang.IllegalArgumentException > at java.nio.Buffer.position(Buffer.java:244) > at > org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199) > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359) > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328) > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) > ... 6 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu closed pull request #15783: [hotfix][python][docs] Fix python.archives docs
dianfu closed pull request #15783: URL: https://github.com/apache/flink/pull/15783 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22519) Have python-archives also take tar.gz
[ https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yik San Chan updated FLINK-22519: - Description: [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip More details: environment.yaml {code:yaml} name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt {code} requirements.txt {code:yaml} apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 {code} End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. was: [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip More details: environment.yaml {code:txt} name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt {code} requirements.txt ``` apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 ``` End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. > Have python-archives also take tar.gz > - > > Key: FLINK-22519 > URL: https://issues.apache.org/jira/browse/FLINK-22519 > Project: Flink > Issue Type: New Feature > Components: API / Python >Reporter: Yik San Chan >Priority: Major > >
[jira] [Commented] (FLINK-20951) IllegalArgumentException when reading Hive parquet table if condition not contain all partitioned fields
[ https://issues.apache.org/jira/browse/FLINK-20951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335129#comment-17335129 ] Rui Li commented on FLINK-20951: Removing the fix version since this is not easy to reproduce. And there's chance that it's related to FLINK-22202. > IllegalArgumentException when reading Hive parquet table if condition not > contain all partitioned fields > > > Key: FLINK-20951 > URL: https://issues.apache.org/jira/browse/FLINK-20951 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.0 > Environment: flink 1.12.0release-12 > sql-cli >Reporter: YUJIANBO >Assignee: Rui Li >Priority: Critical > Labels: stale-assigned > Fix For: 1.13.0, 1.12.4 > > > The production hive table is partitioned by two fields:datekey and event > I have do this test by Flink-sql-cli:(Spark Sql All is OK) > (1)First: > SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid > = 'aa';(OK) > SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa'; > (Error) > (2)Second: > SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid > = 'bb';(OK) > SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb'; > (Error) > The exception is: > {code} > java.lang.RuntimeException: One or more fetchers have encountered exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: java.lang.IllegalArgumentException > at java.nio.Buffer.position(Buffer.java:244) > at > org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33) > at > org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199) > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359) > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328) > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) > at >
[jira] [Updated] (FLINK-22519) Have python-archives also take tar.gz
[ https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yik San Chan updated FLINK-22519: - Description: [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip More details: environment.yaml {code:txt} name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt {code} requirements.txt ``` apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 ``` End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. was: [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip More details: environment.yaml ``` name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt ``` requirements.txt ``` apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 ``` End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. > Have python-archives also take tar.gz > - > > Key: FLINK-22519 > URL: https://issues.apache.org/jira/browse/FLINK-22519 > Project: Flink > Issue Type: New Feature > Components: API / Python >Reporter: Yik San Chan >Priority: Major > >
[jira] [Updated] (FLINK-22519) Have python-archives also take tar.gz
[ https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yik San Chan updated FLINK-22519: - Description: [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip More details: environment.yaml ``` name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt ``` requirements.txt ``` apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 ``` End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. was: [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip More details ``` # environment.yaml name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt ``` ``` #requirements.txt apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 ``` End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. > Have python-archives also take tar.gz > - > > Key: FLINK-22519 > URL: https://issues.apache.org/jira/browse/FLINK-22519 > Project: Flink > Issue Type: New Feature > Components: API / Python >Reporter: Yik San Chan >Priority: Major > >
[jira] [Updated] (FLINK-22519) Have python-archives also take tar.gz
[ https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yik San Chan updated FLINK-22519: - Description: [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip More details ``` # environment.yaml name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt ``` ``` #requirements.txt apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 ``` End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. was: [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip # environment.yaml name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt #requirements.txt apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. > Have python-archives also take tar.gz > - > > Key: FLINK-22519 > URL: https://issues.apache.org/jira/browse/FLINK-22519 > Project: Flink > Issue Type: New Feature > Components: API / Python >Reporter: Yik San Chan >Priority: Major > >
[jira] [Created] (FLINK-22519) Have python-archives also take tar.gz
Yik San Chan created FLINK-22519: Summary: Have python-archives also take tar.gz Key: FLINK-22519 URL: https://issues.apache.org/jira/browse/FLINK-22519 Project: Flink Issue Type: New Feature Components: API / Python Reporter: Yik San Chan [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives] currently only takes zip. In our use case, we want to package the whole conda environment into python-archives, similar to how the [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster] suggest about using venv (Python virtual environment). As we use PyFlink for ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), as well as a lot of small dependencies. This pattern is not friendly for zip. According to the [post|https://superuser.com/a/173825], zip compresses each file independently, and it is not performing good when dealing with a lot of small files. On the other hand, tar simply bundles all files into a tarball, then we can apply gzip to the whole tarball to achieve smaller size. This may explain why the official packaging tool - conda pack - [conda pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even though zip is an option if we really want to. To further prove the idea, I use my laptop and conda env to run an experiment. My OS: macOS 10.15.7 # Create an environment.yaml as well as a requirements.txt # Run `conda env create -f environment.yaml` to create the conda env # Run conda pack to produce a tar.gz # Run conda pack faetflow-ml-env.zip to produce a zip # environment.yaml name: featflow-ml-env channels: - pytorch - conda-forge - defaults dependencies: - python=3.7 - pytorch=1.8.0 - scikit-learn=0.23.2 - pip - pip: - -r file:requirements.txt #requirements.txt apache-flink==1.12.0 deepctr-torch==0.2.6 black==20.8b1 confluent-kafka==1.6.0 pytest==6.2.2 testcontainers==3.4.0 kafka-python==2.0.2 End result: the tar.gz is 854M, the zip is 1.6G So, long story short, python-archives only support zip, while zip is not a good choice for packaging ML libs. Let's change this by adding python-archives tar.gz support. Change will happen in this way: In ProcessPythonEnvironmentManager.java, check the suffix. If tar.gz, unarchive it using gzip decompresser. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22518) Translate the page of "High Availability (HA)" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-22518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-22518: --- Assignee: movesan Yang > Translate the page of "High Availability (HA)" into Chinese > --- > > Key: FLINK-22518 > URL: https://issues.apache.org/jira/browse/FLINK-22518 > Project: Flink > Issue Type: New Feature > Components: chinese-translation >Reporter: movesan Yang >Assignee: movesan Yang >Priority: Major > > > The model of "High Availability (HA)" contains the following three pages: > [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha] > [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html|https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html,] > [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/kubernetes_ha.html] > The markdown file can be found in > [https://github.com/apache/flink/tree/master/docs/content.zh/docs/deployment/ha] > in English. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed
[ https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangjiGuo updated FLINK-22497: --- Environment: hadoop-2.8.4,Flink-1.11.2 (was: hadoop-2.8.4) > When using DefaultRollingPolicy in StreamingFileSink, the file will be > finished delayed > --- > > Key: FLINK-22497 > URL: https://issues.apache.org/jira/browse/FLINK-22497 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.11.2 > Environment: hadoop-2.8.4,Flink-1.11.2 >Reporter: ChangjiGuo >Priority: Major > Attachments: 1.png > > > I had a doubt when testing StreamingFileSink: > The default 60s rolling interval in DefaultRollingPolicy is detected by > procTimeService. If the rolling interval is not met this time, it will be > delayed to the next timer trigger point (after 60s), so this is not > real-time. For example, if the checkpoint period is set to 60s, the file > should be converted to finished at the second checkpoint, but it will be > delayed to the third checkpoint. > You can refer to the attached picture for detail. > If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of > Bucket.write method, the file will be set to finished as we expect at the > second checkpoint. > {code:java} > void write(IN element, long currentTime) throws IOException { > if (inProgressPart == null || > rollingPolicy.shouldRollOnEvent(inProgressPart, element) || > rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) { > if (LOG.isDebugEnabled()) { > LOG.info("Subtask {} closing in-progress part file for bucket > id={} due to element {}.", subtaskIndex, bucketId, element); > } > rollPartFile(currentTime); > } > inProgressPart.write(element, currentTime); > } > {code} > > Is my understanding correct? Or can we do this? > Thanks! ^_^ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22499) JDBC sink table-api support "sink.parallelism" ?
[ https://issues.apache.org/jira/browse/FLINK-22499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335122#comment-17335122 ] ranqiqiang commented on FLINK-22499: thanks, but the version is "jdbc .. {{1.14-SNAPSHOT}}" If I use flink 1.12.2, then use “jdbc-connector {{1.14-SNAPSHOT” , can I do that? }} > JDBC sink table-api support "sink.parallelism" ? > -- > > Key: FLINK-22499 > URL: https://issues.apache.org/jira/browse/FLINK-22499 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Affects Versions: 1.12.1 >Reporter: ranqiqiang >Priority: Major > > I receive message from kafka, then sink to mysql/tidb across the sql > table-api, > How can I change the sink parallelism ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22266) Harden JobMasterStopWithSavepointITCase
[ https://issues.apache.org/jira/browse/FLINK-22266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335120#comment-17335120 ] Guowei Ma commented on FLINK-22266: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17368=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=a0a633b8-47ef-5c5a-2806-3c13b9e48228=4591 > Harden JobMasterStopWithSavepointITCase > --- > > Key: FLINK-22266 > URL: https://issues.apache.org/jira/browse/FLINK-22266 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.13.0, 1.14.0 >Reporter: Guowei Ma >Assignee: Dawid Wysakowicz >Priority: Critical > Labels: test-stability > Fix For: 1.13.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16451=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0=3884 > {code:java} > [ERROR] > throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate(org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase) > Time elapsed: 0.154 s <<< FAILURE! > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:86) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertTrue(Assert.java:52) > at > org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithoutRestartsHelper(JobMasterStopWithSavepointITCase.java:154) > at > org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate(JobMasterStopWithSavepointITCase.java:138) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22518) Translate the page of "High Availability (HA)" into Chinese
movesan created FLINK-22518: --- Summary: Translate the page of "High Availability (HA)" into Chinese Key: FLINK-22518 URL: https://issues.apache.org/jira/browse/FLINK-22518 Project: Flink Issue Type: New Feature Components: chinese-translation Reporter: movesan The model of "High Availability (HA)" contains the following three pages: [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha] [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html|https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html,] [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/kubernetes_ha.html] The markdown file can be found in [https://github.com/apache/flink/tree/master/docs/content.zh/docs/deployment/ha] in English. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong closed pull request #15745: [FLINK-22304][table] Refactor some interfaces for TVF based window to…
wuchong closed pull request #15745: URL: https://github.com/apache/flink/pull/15745 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] wuchong commented on pull request #436: Add Apache Flink release 1.13.0
wuchong commented on pull request #436: URL: https://github.com/apache/flink-web/pull/436#issuecomment-828916305 LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22518) Translate the page of "High Availability (HA)" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-22518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335114#comment-17335114 ] movesan commented on FLINK-22518: - Hi [~jark] . I am very interested in this job, Could you please assign this ticket to me? Thanks a lot. > Translate the page of "High Availability (HA)" into Chinese > --- > > Key: FLINK-22518 > URL: https://issues.apache.org/jira/browse/FLINK-22518 > Project: Flink > Issue Type: New Feature > Components: chinese-translation >Reporter: movesan >Priority: Major > > > The model of "High Availability (HA)" contains the following three pages: > [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha] > [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html|https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html,] > [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/kubernetes_ha.html] > The markdown file can be found in > [https://github.com/apache/flink/tree/master/docs/content.zh/docs/deployment/ha] > in English. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16428) Fine-grained network buffer management for backpressure
[ https://issues.apache.org/jira/browse/FLINK-16428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yingjie Cao updated FLINK-16428: Labels: (was: stale-critical) > Fine-grained network buffer management for backpressure > --- > > Key: FLINK-16428 > URL: https://issues.apache.org/jira/browse/FLINK-16428 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Zhijiang >Priority: Critical > Fix For: 1.14.0 > > > It is an umbrella ticket for tracing the progress of this improvement. > This is the second ingredient to solve the “checkpoints under backpressure” > problem (together with unaligned checkpoints). It consists of two steps: > * See if we can use less network memory in general for streaming jobs (with > potentially different distribution of floating buffers in the input side) > * Under backpressure, reduce network memory to have less in-flight data > (needs specification of algorithm and experiments) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] PatrickRen commented on pull request #15767: [FLINK-22393][docs-zh] Translate the page of "Execution Mode (Batch/Streaming)" into Chinese
PatrickRen commented on pull request #15767: URL: https://github.com/apache/flink/pull/15767#issuecomment-828900787 Thanks for the contribution @95chenjz ! But it looks like a new `.md` file is created for the Chinese documentation. I think the translation should be made in the file `docs/content.zh/docs/dev/datastream/execution_mode.md` instead of creating a new one. Also the `README.md` under `docs` describes how to build the doc and view locally on your browser. You can have a preview to see if your modifications take effect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-22517) Fix pickle compatibility problem in different Python versions
Huang Xingbo created FLINK-22517: Summary: Fix pickle compatibility problem in different Python versions Key: FLINK-22517 URL: https://issues.apache.org/jira/browse/FLINK-22517 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.12.3, 1.13.0 Reporter: Huang Xingbo Assignee: Huang Xingbo Since release-1.12, PyFlink has supported Python3 8. Starting from Python 3.8, the default protocol version used by pickle is pickle5(https://www.python.org/dev/peps/pep-0574/), which will raising the following exception if the client uses python 3.8 to compile program and the cluster node uses python 3.7 or python 3.6 to run python udf: {code:python} ValueError: unsupported pickle protocol: 5 {code} The workaround is to first let the python version used by the client be 3.6 or 3.7. For how to specify the client-side python execution environment, please refer to the doc(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-client-executable). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] PatrickRen commented on a change in pull request #15763: [FLINK-22364][doc] Translate the page of "Data Sources" to Chinese.
PatrickRen commented on a change in pull request #15763: URL: https://github.com/apache/flink/pull/15763#discussion_r620113572 ## File path: docs/content.zh/docs/dev/datastream/sources.md ## @@ -154,28 +161,32 @@ class MySplitEnumerator implements SplitEnumerator { } ``` -### SourceReader + + +### 源阅读器 + +[源阅读器](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java)是一个运行在Task Managers上的组件,用于处理来自分片的记录。 + +`源阅读器`公布了一个拉动式(pull-based)处理接口。Flink任务会在循环中不断调用 `pollNext(ReaderOutput)` 来轮询来自`源阅读器`的记录。`pollNext(ReaderOutput)` 方法的返回值指示source reader的状态。 Review comment: `源阅读器`提供了一个拉动式(pull-based)处理接口 ## File path: docs/content.zh/docs/dev/datastream/sources.md ## @@ -154,28 +161,32 @@ class MySplitEnumerator implements SplitEnumerator { } ``` -### SourceReader + + +### 源阅读器 + +[源阅读器](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java)是一个运行在Task Managers上的组件,用于处理来自分片的记录。 Review comment: 是一个运行在 Task Manager ~~s~~ 上的组件 ## File path: docs/content.zh/docs/dev/datastream/sources.md ## @@ -354,32 +374,36 @@ environment.fromSource( String sourceName) ``` -The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement any timestamp extraction and watermark generation code. +`TimestampAssigner` 和 `WatermarkGenerator` 作为 `ReaderOutput`(或 `SourceOutput`)的一部分透明地运行,因此source实现者不必实现任何时间戳提取和水印生成的代码。 + + - Event Timestamps + 事件时间戳 -Event timestamps are assigned in two steps: +事件时间戳的分配分为以下两步: - 1. The SourceReader may attach the *source record timestamp* to the event, by calling `SourceOutput.collect(event, timestamp)`. - This is relevant only for data sources that are record-based and have timestamps, such as Kafka, Kinesis, Pulsar, or Pravega. - Sources that are not based on records with timestamps (like files) do not have a *source record timestamp*. - This step is part of the source connector implementation and not parameterized by the application that uses the source. + 1. 源阅读器可以通过调用 `SourceOutput.collect(event, timestamp)` 将*源记录时间戳*添加到事件中。 + 这只与基于记录并且拥有时间戳特性的数据源有关,例如 Kafka、Kinesis、Pulsar 或 Pravega。 + 不带有时间戳的记录的源(如文件)没有*源记录时间戳*的特性。 + 此步骤是源连接器实现的一部分,不由使用源的应用程序进行参数化设定。 + + 2. 由应用程序配置的 `TimestampAssigner` 分配最终的时间戳。 + `TimestampAssigner` 会查看原始的*源记录时间戳*和事件。分配器可以直接使用*源记录时间戳*或者访问事件的某个字段获得最终的事件时间戳。 + +这种分两步的方法使用户既可以引用源系统中的时间戳,也可以引用事件数据中的时间戳作为事件时间戳。 - 2. The `TimestampAssigner`, which is configured by the application, assigns the final timestamp. - The `TimestampAssigner` sees the original *source record timestamp* and the event. The assigner can use the *source record timestamp* or access a field of the event obtain the final event timestamp. - -This two-step approach allows users to reference both timestamps from the source systems and timestamps in the event's data as the event timestamp. +*注意:*当使用没有*源记录时间戳*的数据源(如文件)并选择*源记录时间戳*作为最终的事件时间戳时,事件的默认时间戳等于 `LONG_MIN` *(=-9,223,372,036,854,775,808)*。 -*Note:* When using a data source without *source record timestamps* (like files) and selecting the *source record timestamp* as the final event timestamp, events will get a default timestamp equal to `LONG_MIN` *(=-9,223,372,036,854,775,808)*. + - Watermark Generation + 水印生成 -Watermark Generators are only active during streaming execution. Batch execution deactivates Watermark Generators; all related operations described below become effectively no-ops. +水印生成器仅在流执行期间会被激活。批处理执行则会停用水印生成器,则下文所述的所有相关操作实际上都变为无操作。 -The data source API supports running watermark generators individually *per split*. That allows Flink to observe the event time progress per split individually, which is important to handle *event time skew* properly and prevent *idle partitions* from holding back the event time progress of the entire application. +数据源API支持*每个分片*单独运行水印生成器。这使得Flink可以分别观察每个分片的事件时间进度,这对于正确处理*事件时间偏斜*和防止*空闲分区*阻碍整个应用程序的事件时间进度是很重要的。 {{< img width="80%" src="fig/per_split_watermarks.svg" alt="Watermark Generation in a Source with two Splits." >}} -When implementing a source connector using the *Split Reader API*, this is automatically handled. All implementations based on the Split Reader API have split-aware watermarks out-of-the-box. +使用*分片提取器 API*实现源连接器时,将自动进行处理。所有基于分片提取器 API的实现都具有开箱即用(out-of-the-box)的可识别分片的水印。 Review comment: 使用*分片提取器 API*实现源连接器时 -> 使用*分片阅读器(SplitReader)API* 实现源连接器时 与上文的翻译保持一致 ## File path: docs/content.zh/docs/dev/datastream/sources.md ## @@ -28,111 +28,118 @@ under the License. {{< hint warning >}} -**Note**: This describes the new Data Source API, introduced in Flink 1.11 as part of
[jira] [Comment Edited] (FLINK-20945) flink hive insert heap out of memory
[ https://issues.apache.org/jira/browse/FLINK-20945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335084#comment-17335084 ] George Song edited comment on FLINK-20945 at 4/29/21, 1:05 AM: --- [~aswinram92] I think we need to move/duplicate this ticket to parquet-mr project was (Author: yisong): [~aswinram92] I think we need to move this ticket to parquet-mr project > flink hive insert heap out of memory > > > Key: FLINK-20945 > URL: https://issues.apache.org/jira/browse/FLINK-20945 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem > Environment: flink 1.12.0 > hive-exec 2.3.5 >Reporter: Bruce GAO >Priority: Major > Labels: stale-major > > when using flink sql to insert into hive from kafka, heap out of memory > occrus randomly. > Hive table using year/month/day/hour as partition, it seems the max heap > space needed is corresponded to active partition number(according to kafka > message disordered and delay). which means if partition number increases, the > heap space needed also increase, may cause the heap out of memory. > when write record, is it possible to take the whole heap space usage into > account in checkBlockSizeReached, or some other method to avoid OOM? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20945) flink hive insert heap out of memory
[ https://issues.apache.org/jira/browse/FLINK-20945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335084#comment-17335084 ] George Song commented on FLINK-20945: - [~aswinram92] I think we need to move this ticket to parquet-mr project > flink hive insert heap out of memory > > > Key: FLINK-20945 > URL: https://issues.apache.org/jira/browse/FLINK-20945 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem > Environment: flink 1.12.0 > hive-exec 2.3.5 >Reporter: Bruce GAO >Priority: Major > Labels: stale-major > > when using flink sql to insert into hive from kafka, heap out of memory > occrus randomly. > Hive table using year/month/day/hour as partition, it seems the max heap > space needed is corresponded to active partition number(according to kafka > message disordered and delay). which means if partition number increases, the > heap space needed also increase, may cause the heap out of memory. > when write record, is it possible to take the whole heap space usage into > account in checkBlockSizeReached, or some other method to avoid OOM? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] 95chenjz commented on pull request #15767: [FLINK-22393][docs-zh] Translate the page of "Execution Mode (Batch/Streaming)" into Chinese
95chenjz commented on pull request #15767: URL: https://github.com/apache/flink/pull/15767#issuecomment-828877478 cc @wuchong @xccui @klion26 @PatrickRen @becketqin. Could you review my PR when you're free? No hurry, looking for your feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-11681) Add an AbstractMetric to combine the metric definition and metric management.
[ https://issues.apache.org/jira/browse/FLINK-11681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin closed FLINK-11681. Resolution: Abandoned After some discussion, we have not reached consensus on whether this is necessary to do. Close the ticket at this point. We can revisit the metric abstraction if we found some issue. > Add an AbstractMetric to combine the metric definition and metric management. > - > > Key: FLINK-11681 > URL: https://issues.apache.org/jira/browse/FLINK-11681 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Reporter: Jiangjie Qin >Priority: Major > Labels: auto-unassigned, pull-request-available > Time Spent: 40m > Remaining Estimate: 0h > > Currently the metric definitions are spread over many different components. > It would be useful to have a class that combines the metric definition and > management. This ticket is to create such an abstraction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20833) Expose pluggable interface for exception analysis and metrics reporting in Execution Graph
[ https://issues.apache.org/jira/browse/FLINK-20833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335080#comment-17335080 ] Zhenqiu Huang commented on FLINK-20833: --- [~rmetzger] [~xintongsong] Would you please help to review this PR? > Expose pluggable interface for exception analysis and metrics reporting in > Execution Graph > --- > > Key: FLINK-20833 > URL: https://issues.apache.org/jira/browse/FLINK-20833 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.0 >Reporter: Zhenqiu Huang >Priority: Minor > Labels: auto-unassigned, pull-request-available > > For platform users of Apache flink, people usually want to classify the > failure reason( for example user code, networking, dependencies and etc) for > Flink jobs and emit metrics for those analyzed results. So that platform can > provide an accurate value for system reliability by distinguishing the > failure due to user logic from the system issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10806) Support multiple consuming offsets when discovering a new topic
[ https://issues.apache.org/jira/browse/FLINK-10806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335079#comment-17335079 ] Jiangjie Qin commented on FLINK-10806: -- After FLIP-27, the DataStream source allows users to specify arbitrary offsets to start reading from with a custom {{OffsetsInitializer}}. We may need to expose that option in the Table Source as well. Ping [~jark] > Support multiple consuming offsets when discovering a new topic > --- > > Key: FLINK-10806 > URL: https://issues.apache.org/jira/browse/FLINK-10806 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.6.2, 1.8.1 >Reporter: Jiayi Liao >Priority: Major > Labels: auto-unassigned > > In KafkaConsumerBase, we discover the TopicPartitions and compare them with > the restoredState. It's reasonable when a topic's partitions scaled. However, > if we add a new topic which has too much data and restore the Flink program, > the data of the new topic will be consumed from the start, which may not be > what we want. I think this should be an option for developers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-16634) The PartitionDiscoverer in FlinkKafkaConsumer should not use the user provided client.id.
[ https://issues.apache.org/jira/browse/FLINK-16634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin reassigned FLINK-16634: Assignee: Qingsheng Ren > The PartitionDiscoverer in FlinkKafkaConsumer should not use the user > provided client.id. > - > > Key: FLINK-16634 > URL: https://issues.apache.org/jira/browse/FLINK-16634 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jiangjie Qin >Assignee: Qingsheng Ren >Priority: Major > Labels: auto-unassigned > > The {{PartitionDiscoverer}} creates a {{KafkaConsumer}} using the client.id > from the user provided properties. This may cause the MBean to collide with > the fetching {{KafkaConsumer}}. The {{PartitionDiscoverer}} should use a > unique client.id instead, such as "PartitionDiscoverer-RANDOM_LONG" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-20108) SingleThreadFetcherManager may add splits to a shutting down SplitFetcher
[ https://issues.apache.org/jira/browse/FLINK-20108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin closed FLINK-20108. Fix Version/s: (was: 1.11.4) 1.11.3 Resolution: Fixed FLINK-18128 has been backported to 1.11.3. e72e48533902fe6a7271310736584e77b64d05b8 > SingleThreadFetcherManager may add splits to a shutting down SplitFetcher > - > > Key: FLINK-20108 > URL: https://issues.apache.org/jira/browse/FLINK-20108 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.11.2 >Reporter: Jiangjie Qin >Priority: Major > Labels: auto-unassigned > Fix For: 1.11.3 > > > Currently the split fetchers are only removed from the > {{SplitFetcherManager.fetchers}} when the thread exit. The may cause problem > because when {{SplitFetcherManager.addSplits()}} is called, it may see a > shutting down split fetcher and adds splits to it. These splits will then > just be lost. > This issue is actually already fixed in FLINK-18128. The fix needs to > cherry-picked to 1.11.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22516) ResourceManager cannot establish leadership
[ https://issues.apache.org/jira/browse/FLINK-22516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ricky Burnett updated FLINK-22516: -- Affects Version/s: 1.9.3 > ResourceManager cannot establish leadership > --- > > Key: FLINK-22516 > URL: https://issues.apache.org/jira/browse/FLINK-22516 > Project: Flink > Issue Type: Bug >Affects Versions: 1.9.3 > Environment: 1.9.3 Flink version, on kubernetes. >Reporter: Ricky Burnett >Priority: Major > Attachments: jobmanager_leadership.log > > > We are running Flink clusters with 2 Jobmanagers in HA mode. After a > Zookeeper restart the two JMs begin leadership election end up in state where > they are both trying to start their ResourceManager and until one of them > writes to `leader//resource_manager_lock` and the other Jobmanager's > JobMaster proceeds to execute `notifyOfNewResourceManagerLeader` which > restarts the ResourceManager. This in turn writes to > `leader//resource_manager_lock` which triggers the first JobMaster to > restart it's ResourceManager. We can see this in the logs from the > "ResourceManager leader changed to new address" log, that goes back and forth > between the two JMs and the two IP addresses. This cycle appears to continue > indefinitely with outside interruption. > I've attached combined logs from two JMs in our environment that got into > this state. The logs start with the loss of connection and end with a couple > of cycles of back and forth. The two relevant hosts are > "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and > "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x". > *-tsxb7 appears to be the last host that was granted leadership. > {code:java} > {"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager > runner for job tenant: ssademo, pipeline: > 828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection > (33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id > 97992805-9c60-40ba-8260-aaf036694cde at > akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"} > {code} > But *-mpf9x continues to try to wrestle control back. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22516) ResourceManager cannot establish leadership
[ https://issues.apache.org/jira/browse/FLINK-22516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ricky Burnett updated FLINK-22516: -- Environment: 1.9.3 Flink version, on kubernetes. > ResourceManager cannot establish leadership > --- > > Key: FLINK-22516 > URL: https://issues.apache.org/jira/browse/FLINK-22516 > Project: Flink > Issue Type: Bug > Environment: 1.9.3 Flink version, on kubernetes. >Reporter: Ricky Burnett >Priority: Major > Attachments: jobmanager_leadership.log > > > We are running Flink clusters with 2 Jobmanagers in HA mode. After a > Zookeeper restart the two JMs begin leadership election end up in state where > they are both trying to start their ResourceManager and until one of them > writes to `leader//resource_manager_lock` and the other Jobmanager's > JobMaster proceeds to execute `notifyOfNewResourceManagerLeader` which > restarts the ResourceManager. This in turn writes to > `leader//resource_manager_lock` which triggers the first JobMaster to > restart it's ResourceManager. We can see this in the logs from the > "ResourceManager leader changed to new address" log, that goes back and forth > between the two JMs and the two IP addresses. This cycle appears to continue > indefinitely with outside interruption. > I've attached combined logs from two JMs in our environment that got into > this state. The logs start with the loss of connection and end with a couple > of cycles of back and forth. The two relevant hosts are > "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and > "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x". > *-tsxb7 appears to be the last host that was granted leadership. > {code:java} > {"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager > runner for job tenant: ssademo, pipeline: > 828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection > (33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id > 97992805-9c60-40ba-8260-aaf036694cde at > akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"} > {code} > But *-mpf9x continues to try to wrestle control back. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
galenwarren commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r622618082 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + +/** The underlying blob storage. */ +private final BlobStorage storage; + +/** The GS file system options. */ +private final GSFileSystemOptions options; + +/** The recoverable writer instance. */ +private final GSRecoverableWriter writer; + +/** The recoverable writer state for the commit operation. */ +private final GSRecoverableWriterState state; + +GSRecoverableWriterCommitter( +BlobStorage storage, +GSFileSystemOptions options, +GSRecoverableWriter writer, +GSRecoverableWriterState state) { +this.storage = Preconditions.checkNotNull(storage); +this.options = Preconditions.checkNotNull(options); +this.writer = Preconditions.checkNotNull(writer); +this.state = Preconditions.checkNotNull(state); +} + +@Override +public void commit() throws IOException { + +// compose all the component blob ids into the final blob id. if the component blob ids are +// in the same bucket as the final blob id, this can be done directly. otherwise, we must +// compose to a new temporary blob id in the same bucket as the component blob ids and +// then copy that blob to the final blob location +if (state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) { + +// compose directly to final blob +composeBlobs( +state.getComponentBlobIds(options), +state.finalBlobId, +options.writerContentType); + +} else { + +// compose to a temporary blob id, then copy to final blob id +BlobId intermediateBlobId = state.createTemporaryBlobId(options); +composeBlobs( +state.getComponentBlobIds(options), +intermediateBlobId, +options.writerContentType); +storage.copy(intermediateBlobId, state.finalBlobId); +} + +// clean up after commit +writer.cleanupRecoverableState(state); Review comment: One more thought on recovering from earlier checkpoints. Another way to make this possible would be not to do any cleanup in ```commit``` or ```cleanupRecoverableState``` and leave the responsibility for cleanup of temporary blobs to the caller. The most obvious way for the caller to do this would be to specify a separate bucket for temporary blobs and then to apply a TTL. This would make things nice and simple -- temporary files would stay around for the TTL period, and it would be the responsibility of the caller to choose an appropriate TTL to balance recovery and storage needs. The ability to recover wouldn't depend on whether ```commit``` or ```cleanupRecoverableState``` or any other method had been called at some point, it would just depend on the age of the temporary blobs. This actually strikes me as a nice configuration, one that I might like to use. The downside, of course, would be that if this were the default configuration, then callers who write to GS files without supplying any additional configuration (i.e. no options) would leak temporary blobs. If this were not the
[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
flinkbot edited a comment on pull request #15797: URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728 ## CI report: * 9208f004adafc3997d91921efe3e7d291eb8360f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17362) * 32b72459e66decd79269d3718f1ee40c3d765775 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17372) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
flinkbot edited a comment on pull request #15797: URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728 ## CI report: * 9208f004adafc3997d91921efe3e7d291eb8360f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17362) * 32b72459e66decd79269d3718f1ee40c3d765775 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22516) ResourceManager cannot establish leadership
[ https://issues.apache.org/jira/browse/FLINK-22516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ricky Burnett updated FLINK-22516: -- Description: We are running Flink clusters with 2 Jobmanagers in HA mode. After a Zookeeper restart the two JMs begin leadership election end up in state where they are both trying to start their ResourceManager and until one of them writes to `leader//resource_manager_lock` and the other Jobmanager's JobMaster proceeds to execute `notifyOfNewResourceManagerLeader` which restarts the ResourceManager. This in turn writes to `leader//resource_manager_lock` which triggers the first JobMaster to restart it's ResourceManager. We can see this in the logs from the "ResourceManager leader changed to new address" log, that goes back and forth between the two JMs and the two IP addresses. This cycle appears to continue indefinitely with outside interruption. I've attached combined logs from two JMs in our environment that got into this state. The logs start with the loss of connection and end with a couple of cycles of back and forth. The two relevant hosts are "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x". *-tsxb7 appears to be the last host that was granted leadership. {code:java} {"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager runner for job tenant: ssademo, pipeline: 828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection (33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id 97992805-9c60-40ba-8260-aaf036694cde at akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"} {code} But *-mpf9x continues to try to wrestle control back. was: We are running Flink clusters with 2 Jobmanagers in HA mode. After a Zookeeper restart the two JMs begin leadership election end up in state where they are both trying to start their ResourceManager and until one of them writes to `leader//resource_manager_lock` and the JobMaster proceeds to execute `notifyOfNewResourceManagerLeader` which restarts the ResourceManager. This in turn writes to `leader//resource_manager_lock` which triggers the other JobMaster to restart it's ResourceManager. We can see this in the logs from the "ResourceManager leader changed to new address" log, that goes back and forth between the two JMs and the two IP addresses. This cycle appears to continue indefinitely with outside interruption. I've attached combined logs from two JMs in our environment that got into this state. The logs start with the loss of connection and end with a couple of cycles of back and forth. The two relevant hosts are "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x". *-tsxb7 appears to be the last host that was granted leadership. {code:java} {"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager runner for job tenant: ssademo, pipeline: 828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection (33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id 97992805-9c60-40ba-8260-aaf036694cde at akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"} {code} But *-mpf9x continues to try to wrestle control back. > ResourceManager cannot establish leadership > --- > > Key: FLINK-22516 > URL: https://issues.apache.org/jira/browse/FLINK-22516 > Project: Flink > Issue Type: Bug >Reporter: Ricky Burnett >Priority: Major > Attachments: jobmanager_leadership.log > > > We are running Flink clusters with 2 Jobmanagers in HA mode. After a > Zookeeper restart the two JMs begin leadership
[jira] [Created] (FLINK-22516) ResourceManager cannot establish leadership
Ricky Burnett created FLINK-22516: - Summary: ResourceManager cannot establish leadership Key: FLINK-22516 URL: https://issues.apache.org/jira/browse/FLINK-22516 Project: Flink Issue Type: Bug Reporter: Ricky Burnett Attachments: jobmanager_leadership.log We are running Flink clusters with 2 Jobmanagers in HA mode. After a Zookeeper restart the two JMs begin leadership election end up in state where they are both trying to start their ResourceManager and until one of them writes to `leader//resource_manager_lock` and the JobMaster proceeds to execute `notifyOfNewResourceManagerLeader` which restarts the ResourceManager. This in turn writes to `leader//resource_manager_lock` which triggers the other JobMaster to restart it's ResourceManager. We can see this in the logs from the "ResourceManager leader changed to new address" log, that goes back and forth between the two JMs and the two IP addresses. This cycle appears to continue indefinitely with outside interruption. I've attached combined logs from two JMs in our environment that got into this state. The logs start with the loss of connection and end with a couple of cycles of back and forth. The two relevant hosts are "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x". *-tsxb7 appears to be the last host that was granted leadership. {code:java} {"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager runner for job tenant: ssademo, pipeline: 828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection (33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id 97992805-9c60-40ba-8260-aaf036694cde at akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"} {code} But *-mpf9x continues to try to wrestle control back. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
flinkbot edited a comment on pull request #15797: URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728 ## CI report: * 9208f004adafc3997d91921efe3e7d291eb8360f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17362) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions
flinkbot edited a comment on pull request #15798: URL: https://github.com/apache/flink/pull/15798#issuecomment-828566333 ## CI report: * d02069bb5bff8be1d10a04cc981caa01fb3919ab Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17364) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase
flinkbot edited a comment on pull request #15631: URL: https://github.com/apache/flink/pull/15631#issuecomment-820458718 ## CI report: * 8db656d8d783b61761d3de7727c0e8831c5e77cd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17361) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15479: [FLINK-19606][table-runtime-blink] Introduce StreamExecWindowJoin and window join it cases
flinkbot edited a comment on pull request #15479: URL: https://github.com/apache/flink/pull/15479#issuecomment-812344008 ## CI report: * 1b562e2eec41f48d9ac9b4e7591240b6c8d99e61 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17360) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-22515) Add Documentation for Flink Glue Schema Registry Integration
Linyu Yao created FLINK-22515: - Summary: Add Documentation for Flink Glue Schema Registry Integration Key: FLINK-22515 URL: https://issues.apache.org/jira/browse/FLINK-22515 Project: Flink Issue Type: New Feature Components: Documentation Reporter: Linyu Yao Add documentation for Flink Glue Schema Registry integration to the page of Kafka Connector and Kinesis Connector: [https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#apache-kafka-connector] [https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kinesis.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-web] XComp commented on a change in pull request #436: Add Apache Flink release 1.13.0
XComp commented on a change in pull request #436: URL: https://github.com/apache/flink-web/pull/436#discussion_r622363151 ## File path: _posts/2021-04-22-release-1.13.0.md ## @@ -0,0 +1,483 @@ +--- +layout: post +title: "Apache Flink 1.13.0 Release Announcement" +date: 2021-04-22T08:00:00.000Z +categories: news +authors: +- stephan: + name: "Stephan Ewen" + twitter: "StephanEwen" +- dwysakowicz: + name: "Dawid Wysakowicz" + twitter: "dwysakowicz" + +excerpt: The Apache Flink community is excited to announce the release of Flink 1.13.0! Close to xxx contributors worked on over xxx threads to bring significant improvements to usability and observability as well as new features that improve elasticity of Flink’s Application-style deployments. +--- + + +The Apache Flink community is excited to announce the release of Flink 1.13.0! More than 200 +contributors worked on over 1k threads to bring significant improvements to usability and +observability as well as new features that improve elasticity of Flink’s Application-style +deployments. + +This release brings us a big step forward in one of our major efforts: Making Stream Processing +Applications as natural and as simple to manage as any other application. The new reactive scaling +mode means that scaling streaming applications in and out now works like in any other application, +by just changing the number of parallel processes. + +We also added a series of improvements that help users better understand the performance of +applications. When the streams don't flow as fast as you’d hope, these can help you to understand +why: Load and backpressure visualization to identify bottlenecks, CPU flame graphs to identify hot +code paths in your application, and State Access Latencies to see how the State Backends are keeping +up. + +This blog post describes all major new features and improvements, important changes to be aware of +and what to expect moving forward. + +{% toc %} + +We encourage you to [download the release](https://flink.apache.org/downloads.html) and share your +feedback with the community through +the [Flink mailing lists](https://flink.apache.org/community.html#mailing-lists) +or [JIRA](https://issues.apache.org/jira/projects/FLINK/summary). + +## Notable Features and Improvements + +### Reactive mode + +The Reactive Mode is the latest piece in Flink's initiative for making Stream Processing +Applications as natural and as simple to manage as any other application. + +Flink has a dual nature when it comes to resource management and deployments: You can deploy +clusters onto Resource Managers like Kubernetes or Yarn in such a way that Flink actively manages +the resource, and allocates and releases workers as needed. That is especially useful for jobs and +applications that rapidly change their required resources, like batch applications and ad-hoc SQL +queries. The application parallelism rules, the number of workers follows. We call this active +scaling. + +For long running streaming applications, it is often a nicer model to just deploy them like any +other long-running application: The application doesn't really need to know that it runs on K8s, +EKS, Yarn, etc. and doesn't try to acquire a specific amount of workers; instead, it just uses the +number of workers that is given to it. The number of workers rules, the application parallelism +adjusts to that. We call that re-active scaling. + +The Application Deployment Mode started this effort, making deployments application-like (avoiding +having to separate deployment steps to (1) start cluster and (2) submit application). The reactive +scheduler completes this, and you now don't have to use extra tools (scripts or a K8s operator) any +more to keep the number of workers and the application parallelism settings in sync. + +You can now put an auto-scaler around Flink applications like around other typical applications — as +long as you are mindful when configuring the autoscaler that stateful applications still spend +effort in moving state around when scaling. + + +### Bottleneck detection, Backpressure and Idleness Monitoring + +One of the most important metrics to investigate when a job does not consume records as fast as you +would expect is the backpressure ratio. It lets you track down bottlenecks in your pipelines. The +current mechanism had two limitations: +It was heavy, because it worked by repeatedly taking stack trace samples of your running tasks. It +was difficult to find out which vertex was the source of backpressure. In Flink 1.13, we reworked +the mechanism to include new metrics for the time tasks spend being backpressured, along with a +reworked graphical representation of the job (including a percentage of time particular vertices are +backpressured). + + + + + + +### Support for CPU flame graphs in Web UI + +It is desirable to provide better visibility into the distribution of CPU resources while
[jira] [Commented] (FLINK-22513) Failed to upload compile artifacts
[ https://issues.apache.org/jira/browse/FLINK-22513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334913#comment-17334913 ] Robert Metzger commented on FLINK-22513: Let's hope this is just some glitch in the infrastructure that's not permanent > Failed to upload compile artifacts > -- > > Key: FLINK-22513 > URL: https://issues.apache.org/jira/browse/FLINK-22513 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.13.0 >Reporter: Dawid Wysakowicz >Priority: Major > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17343=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=5e2d176c-a6d4-5b54-7808-b11714e363ad -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15479: [FLINK-19606][table-runtime-blink] Introduce StreamExecWindowJoin and window join it cases
flinkbot edited a comment on pull request #15479: URL: https://github.com/apache/flink/pull/15479#issuecomment-812344008 ## CI report: * 1b562e2eec41f48d9ac9b4e7591240b6c8d99e61 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17360) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15796: [FLINK-22511][python] Fix the bug of non-composite result type in Python TableAggregateFunction
flinkbot edited a comment on pull request #15796: URL: https://github.com/apache/flink/pull/15796#issuecomment-828441475 ## CI report: * 4c3e94df03863bf43eac7b47be51764f2f4be659 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17352) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-11158) Do not show empty page for unknown jobs
[ https://issues.apache.org/jira/browse/FLINK-11158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber closed FLINK-11158. --- Resolution: Fixed Actually, I think, this has been solved by now since the Web UI is presenting a "job not found" popup when you try to access a non-existing job. > Do not show empty page for unknown jobs > --- > > Key: FLINK-11158 > URL: https://issues.apache.org/jira/browse/FLINK-11158 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nico Kruber >Priority: Major > Labels: auto-unassigned > > If you try to access the Web UI using an old/non-existing job ID, e.g. after > a cluster restart, you are currently presented with a white page with no > further info. It should at least contain "job not found" message to indicate > that there was no other error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample
flinkbot edited a comment on pull request #15121: URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240 ## CI report: * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN * b7dca63a2e98f09c8c780a21091b5268d897b220 UNKNOWN * 2b7bb05a4dbbb0883db566589dfe1975fdbb279d UNKNOWN * 00d3356d86576e2a129405a92f9ebb2acb5d6412 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17358) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
galenwarren commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r622377812 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + +/** The underlying blob storage. */ +private final BlobStorage storage; + +/** The GS file system options. */ +private final GSFileSystemOptions options; + +/** The recoverable writer instance. */ +private final GSRecoverableWriter writer; + +/** The recoverable writer state for the commit operation. */ +private final GSRecoverableWriterState state; + +GSRecoverableWriterCommitter( +BlobStorage storage, +GSFileSystemOptions options, +GSRecoverableWriter writer, +GSRecoverableWriterState state) { +this.storage = Preconditions.checkNotNull(storage); +this.options = Preconditions.checkNotNull(options); +this.writer = Preconditions.checkNotNull(writer); +this.state = Preconditions.checkNotNull(state); +} + +@Override +public void commit() throws IOException { + +// compose all the component blob ids into the final blob id. if the component blob ids are +// in the same bucket as the final blob id, this can be done directly. otherwise, we must +// compose to a new temporary blob id in the same bucket as the component blob ids and +// then copy that blob to the final blob location +if (state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) { + +// compose directly to final blob +composeBlobs( +state.getComponentBlobIds(options), +state.finalBlobId, +options.writerContentType); + +} else { + +// compose to a temporary blob id, then copy to final blob id +BlobId intermediateBlobId = state.createTemporaryBlobId(options); +composeBlobs( +state.getComponentBlobIds(options), +intermediateBlobId, +options.writerContentType); +storage.copy(intermediateBlobId, state.finalBlobId); +} + +// clean up after commit +writer.cleanupRecoverableState(state); Review comment: Yes, agreed. I've set up a few test cases locally and watched how the various methods (```cleanupRecoverableState```, ```commit```, etc.) get called using both a [DefaultRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html) and a [CheckpointRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.html) with a [StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html). I think I see what's going on here -- I'll share my findings with you and please let me know what you think. First, a class of interest is the [Bucket](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java) class, which is written to by a streaming file sink and is responsible for interacting with the underlying storage, via a
[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
galenwarren commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r622380775 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** The state of a recoverable write. */ +class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, Cloneable { + +/** The blob id to which the recoverable write operation is writing. */ +public final BlobId finalBlobId; + +/** The number of bytes that have been written so far. */ +public long bytesWritten; + +/** Indicates if the write has been closed. */ +public boolean closed; + +/** The object ids for the temporary objects that should be composed to form the final blob. */ +public final List componentObjectIds; + +GSRecoverableWriterState( +BlobId finalBlobId, long bytesWritten, boolean closed, List componentObjectIds) { +this.finalBlobId = Preconditions.checkNotNull(finalBlobId); +Preconditions.checkArgument(bytesWritten >= 0); +this.bytesWritten = bytesWritten; +this.closed = closed; + +// shallow copy the component object ids to ensure this state object exclusively +// manages the list of component object ids +this.componentObjectIds = new ArrayList<>(Preconditions.checkNotNull(componentObjectIds)); +} + +GSRecoverableWriterState(GSRecoverableWriterState state) { +this(state.finalBlobId, state.bytesWritten, state.closed, state.componentObjectIds); +} + +GSRecoverableWriterState(BlobId finalBlobId) { +this(finalBlobId, 0, false, new ArrayList<>()); +} + +/** + * Returns the temporary bucket name. If options specifies a temporary bucket name, we use that + * one; otherwise, we use the bucket name of the final blob. + * + * @param options The GS file system options + * @return The temporary bucket name + */ +String getTemporaryBucketName(GSFileSystemOptions options) { +return options.writerTemporaryBucketName.isEmpty() +? finalBlobId.getBucket() +: options.writerTemporaryBucketName; +} Review comment: Yes -- agreed, will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
galenwarren commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r622377812 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + +/** The underlying blob storage. */ +private final BlobStorage storage; + +/** The GS file system options. */ +private final GSFileSystemOptions options; + +/** The recoverable writer instance. */ +private final GSRecoverableWriter writer; + +/** The recoverable writer state for the commit operation. */ +private final GSRecoverableWriterState state; + +GSRecoverableWriterCommitter( +BlobStorage storage, +GSFileSystemOptions options, +GSRecoverableWriter writer, +GSRecoverableWriterState state) { +this.storage = Preconditions.checkNotNull(storage); +this.options = Preconditions.checkNotNull(options); +this.writer = Preconditions.checkNotNull(writer); +this.state = Preconditions.checkNotNull(state); +} + +@Override +public void commit() throws IOException { + +// compose all the component blob ids into the final blob id. if the component blob ids are +// in the same bucket as the final blob id, this can be done directly. otherwise, we must +// compose to a new temporary blob id in the same bucket as the component blob ids and +// then copy that blob to the final blob location +if (state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) { + +// compose directly to final blob +composeBlobs( +state.getComponentBlobIds(options), +state.finalBlobId, +options.writerContentType); + +} else { + +// compose to a temporary blob id, then copy to final blob id +BlobId intermediateBlobId = state.createTemporaryBlobId(options); +composeBlobs( +state.getComponentBlobIds(options), +intermediateBlobId, +options.writerContentType); +storage.copy(intermediateBlobId, state.finalBlobId); +} + +// clean up after commit +writer.cleanupRecoverableState(state); Review comment: Yes, agreed. I've set up a few test cases locally and watched how the various methods (```cleanupRecoverableState```, ```commit```, etc.) get called using both a [DefaultRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html) and a [CheckpointRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.html) with a [StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html). I think I see what's going on here -- I'll share my findings with you and please let me know what you think. First, a class of interest is the [Bucket](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java) class, which is written to by a streaming file sink and is responsible for interacting with the underlying storage, via a
[GitHub] [flink] galenwarren commented on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
galenwarren commented on pull request #15599: URL: https://github.com/apache/flink/pull/15599#issuecomment-828625909 @rmetzger Thanks for your help diagnosing the build issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
galenwarren commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r622377812 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + +/** The underlying blob storage. */ +private final BlobStorage storage; + +/** The GS file system options. */ +private final GSFileSystemOptions options; + +/** The recoverable writer instance. */ +private final GSRecoverableWriter writer; + +/** The recoverable writer state for the commit operation. */ +private final GSRecoverableWriterState state; + +GSRecoverableWriterCommitter( +BlobStorage storage, +GSFileSystemOptions options, +GSRecoverableWriter writer, +GSRecoverableWriterState state) { +this.storage = Preconditions.checkNotNull(storage); +this.options = Preconditions.checkNotNull(options); +this.writer = Preconditions.checkNotNull(writer); +this.state = Preconditions.checkNotNull(state); +} + +@Override +public void commit() throws IOException { + +// compose all the component blob ids into the final blob id. if the component blob ids are +// in the same bucket as the final blob id, this can be done directly. otherwise, we must +// compose to a new temporary blob id in the same bucket as the component blob ids and +// then copy that blob to the final blob location +if (state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) { + +// compose directly to final blob +composeBlobs( +state.getComponentBlobIds(options), +state.finalBlobId, +options.writerContentType); + +} else { + +// compose to a temporary blob id, then copy to final blob id +BlobId intermediateBlobId = state.createTemporaryBlobId(options); +composeBlobs( +state.getComponentBlobIds(options), +intermediateBlobId, +options.writerContentType); +storage.copy(intermediateBlobId, state.finalBlobId); +} + +// clean up after commit +writer.cleanupRecoverableState(state); Review comment: Yes, agreed. I've set up a few test cases locally and watched how the various methods (```cleanupRecoverableState```, ```commit```, etc.) get called using both a [DefaultRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html) and a [CheckpointRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.html) with a [StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html). I think I see what's going on here -- I'll share my findings with you and please let me know what you think. First, a class of interest is the [Bucket](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java) class, which is written to by a streaming file sink and is responsible for interacting with the underlying storage, via a
[GitHub] [flink] todd5167 commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…
todd5167 commented on a change in pull request #15755: URL: https://github.com/apache/flink/pull/15755#discussion_r622333275 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java ## @@ -144,6 +149,116 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } +public static Operation convertRenameColumn( +ObjectIdentifier tableIdentifier, +String originColumnName, +String newColumnName, +CatalogTable catalogTable) { + +Schema modifiedTableSchema = catalogTable.getUnresolvedSchema(); +validateColumnName(originColumnName, newColumnName, modifiedTableSchema); + +Schema.Builder builder = Schema.newBuilder(); +// build column +modifiedTableSchema.getColumns().stream() +.forEach( +column -> { +if (StringUtils.equals(column.getName(), originColumnName)) { +buildNewColumnFromOriginColumn(builder, column, newColumnName); +} else { +buildNewColumnFromOriginColumn(builder, column, column.getName()); +} +}); +// build primary key column +List originPrimaryKeyNames = +modifiedTableSchema +.getPrimaryKey() +.map(Schema.UnresolvedPrimaryKey::getColumnNames) +.orElseGet(Lists::newArrayList); + +List newPrimaryKeyNames = +originPrimaryKeyNames.stream() +.map( +pkName -> +StringUtils.equals(pkName, originColumnName) +? newColumnName +: pkName) +.collect(Collectors.toList()); + +if (newPrimaryKeyNames.size() > 0) { +builder.primaryKey(newPrimaryKeyNames); +} +// build watermark +modifiedTableSchema.getWatermarkSpecs().stream() +.forEach( +watermarkSpec -> { +String watermarkRefColumnName = watermarkSpec.getColumnName(); +Expression watermarkExpression = watermarkSpec.getWatermarkExpression(); +if (StringUtils.equals(watermarkRefColumnName, originColumnName)) { +String newWatermarkExpression = +((SqlCallExpression) watermarkExpression) +.getSqlExpression() + .replace(watermarkRefColumnName, newColumnName); Review comment: hi, @wuchong. If the calculated column expression contains a renamed column, it will throw 'Cannot resolve field [oldName], input field list:[xx, newName]' when resolving the schema. You said 'traverse the node tree to check whether it contains the renamed column', I don’t know where to add it. If use ResolverRule, how to distinguish it from ReferenceResolverRule. thanks . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] todd5167 commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…
todd5167 commented on a change in pull request #15755: URL: https://github.com/apache/flink/pull/15755#discussion_r622333275 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java ## @@ -144,6 +149,116 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } +public static Operation convertRenameColumn( +ObjectIdentifier tableIdentifier, +String originColumnName, +String newColumnName, +CatalogTable catalogTable) { + +Schema modifiedTableSchema = catalogTable.getUnresolvedSchema(); +validateColumnName(originColumnName, newColumnName, modifiedTableSchema); + +Schema.Builder builder = Schema.newBuilder(); +// build column +modifiedTableSchema.getColumns().stream() +.forEach( +column -> { +if (StringUtils.equals(column.getName(), originColumnName)) { +buildNewColumnFromOriginColumn(builder, column, newColumnName); +} else { +buildNewColumnFromOriginColumn(builder, column, column.getName()); +} +}); +// build primary key column +List originPrimaryKeyNames = +modifiedTableSchema +.getPrimaryKey() +.map(Schema.UnresolvedPrimaryKey::getColumnNames) +.orElseGet(Lists::newArrayList); + +List newPrimaryKeyNames = +originPrimaryKeyNames.stream() +.map( +pkName -> +StringUtils.equals(pkName, originColumnName) +? newColumnName +: pkName) +.collect(Collectors.toList()); + +if (newPrimaryKeyNames.size() > 0) { +builder.primaryKey(newPrimaryKeyNames); +} +// build watermark +modifiedTableSchema.getWatermarkSpecs().stream() +.forEach( +watermarkSpec -> { +String watermarkRefColumnName = watermarkSpec.getColumnName(); +Expression watermarkExpression = watermarkSpec.getWatermarkExpression(); +if (StringUtils.equals(watermarkRefColumnName, originColumnName)) { +String newWatermarkExpression = +((SqlCallExpression) watermarkExpression) +.getSqlExpression() + .replace(watermarkRefColumnName, newColumnName); Review comment: hi, @wuchong. If the calculated column expression contains a renamed column, it will throw 'Cannot resolve field [oldName], input field list:[xx, newName]' when resolving the schema. You said 'traverse the node tree to check whether it contains the renamed column', I don’t know where to add it. If use ResolverRule, how to distinguish it from ReferenceResolverRule. thanks . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions
flinkbot edited a comment on pull request #15798: URL: https://github.com/apache/flink/pull/15798#issuecomment-828566333 ## CI report: * d02069bb5bff8be1d10a04cc981caa01fb3919ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17364) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-22514) TypeExtractor - Improving log message
Avishay Balderman created FLINK-22514: - Summary: TypeExtractor - Improving log message Key: FLINK-22514 URL: https://issues.apache.org/jira/browse/FLINK-22514 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Avishay Balderman org.apache.flink.api.java.typeutils.TypeExtractor is checking if a field in a class is a "valid POJO field" . The method that is responsible for this is: {code:java} isValidPojoField{code} When isValidPojoField find an invalid field a log message is written (see below) but the log message does not tell which field is invalid... So now the developer needs to find out the "bad" field. Adding the field info to the log message is easy and save the developer time. {code:java} for (Field field : fields) { Type fieldType = field.getGenericType(); if(!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { LOG.info("Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " + "and must be processed as GenericType. Please read the Flink documentation " + "on \"Data Types & Serialization\" for details of the effect on performance."); return null; } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22514) TypeExtractor - Improving log message
[ https://issues.apache.org/jira/browse/FLINK-22514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Avishay Balderman updated FLINK-22514: -- Priority: Minor (was: Major) > TypeExtractor - Improving log message > -- > > Key: FLINK-22514 > URL: https://issues.apache.org/jira/browse/FLINK-22514 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Avishay Balderman >Priority: Minor > > org.apache.flink.api.java.typeutils.TypeExtractor is checking if a field in a > class is a "valid POJO field" . > The method that is responsible for this is: > {code:java} > isValidPojoField{code} > When isValidPojoField find an invalid field a log message is written (see > below) but the log message does not tell which field is invalid... > So now the developer needs to find out the "bad" field. > Adding the field info to the log message is easy and save the developer time. > > > {code:java} > for (Field field : fields) { >Type fieldType = field.getGenericType(); >if(!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { > LOG.info("Class " + clazz + " cannot be used as a POJO type because not > all fields are valid POJO fields, " + > "and must be processed as GenericType. Please read the Flink > documentation " + > "on \"Data Types & Serialization\" for details of the effect on > performance."); > return null; >} > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions
flinkbot commented on pull request #15798: URL: https://github.com/apache/flink/pull/15798#issuecomment-828566333 ## CI report: * d02069bb5bff8be1d10a04cc981caa01fb3919ab UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22438) add numRecordsOut metric for Async IO
[ https://issues.apache.org/jira/browse/FLINK-22438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-22438: --- Fix Version/s: (was: 1.12.3) 1.12.4 > add numRecordsOut metric for Async IO > - > > Key: FLINK-22438 > URL: https://issues.apache.org/jira/browse/FLINK-22438 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / Task >Affects Versions: 1.11.3, 1.12.2, 1.13.0 >Reporter: Zhengqi Zhang >Assignee: Zhengqi Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0, 1.13.1, 1.12.4 > > Attachments: QQ截图20210424004201.png > > Original Estimate: 24h > Remaining Estimate: 24h > > In Flink WebUI,there is no numRecordsOut metric,and the class > AsyncWaitOperator did't have this metric in fact.Other operators have this > metric, which makes it difficult to monitor Async IO operator and can cause > confusion for users. > I think we can directly use the wrapping output class CountingOutput to > update numRecordsOut metric.CountingOutput is used in super class of > AsyncWaitOperator(AbstractStreamOperator). > Here is my commit,And I have run a test, it work. > [my > commit|https://github.com/onyourhead/flink/commit/58a8ac27b292280696639caa2e311637cd631a00] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22479) [Kinesis][Consumer] Potential lock-up under error condition
[ https://issues.apache.org/jira/browse/FLINK-22479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-22479: --- Fix Version/s: (was: 1.12.3) 1.12.4 > [Kinesis][Consumer] Potential lock-up under error condition > --- > > Key: FLINK-22479 > URL: https://issues.apache.org/jira/browse/FLINK-22479 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.12.0, 1.12.1, 1.12.2 >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Critical > Labels: pull-request-available > Fix For: 1.13.0, 1.14.0, 1.12.4 > > > *Background* > This connector has been > [forked|https://github.com/awslabs/amazon-kinesis-connector-flink] by AWS for > use on KDA with Flink 1.11. Bugs have been encountered: > - Under high backpressure scenarios > - When an error is thrown during tear down > *Scope* > Pull in the following fixes from AWS fork: > * Fix issue where {{KinesisDataFetcher.shutdownFetcher()}} hangs > ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/23], > [pull > request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/24]) > > * Log error when shutting down Kinesis Data Fetcher > ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/22], > [pull > request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/25]) > > * Treating TimeoutException as Recoverable Exception > ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28], > [pull > request|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/21]) > > * Add time-out for acquiring subscription and passing events from network to > source thread to prevent deadlock ([pull > request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/18]) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22434) Dispatcher does not store suspended jobs in execution graph store
[ https://issues.apache.org/jira/browse/FLINK-22434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-22434: --- Fix Version/s: (was: 1.12.3) 1.12.4 > Dispatcher does not store suspended jobs in execution graph store > - > > Key: FLINK-22434 > URL: https://issues.apache.org/jira/browse/FLINK-22434 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Fabian Paul >Assignee: Fabian Paul >Priority: Major > Fix For: 1.11.4, 1.14.0, 1.13.1, 1.12.4 > > > Only globally terminated jobs are currently stored in the execution graph > store after termination. In case the JobManager is shutdown and jobs are > still running, these jobs will be suspended which is a non-globally > terminated state. > The problem surfaces when a user tries to access information about the job > during termination, leading to a job not found response. By storing all > terminated jobs in the execution graph store this should be fixed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-jira-bot] sjwiesman closed pull request #11: [hotfix] Make comment language friendly to contributors
sjwiesman closed pull request #11: URL: https://github.com/apache/flink-jira-bot/pull/11 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15760: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder.
flinkbot edited a comment on pull request #15760: URL: https://github.com/apache/flink/pull/15760#issuecomment-826500326 ## CI report: * e7a99f5f45e9acb529b903d7823735f3158756fc Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17348) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
flinkbot edited a comment on pull request #15797: URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728 ## CI report: * 9208f004adafc3997d91921efe3e7d291eb8360f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17362) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase
flinkbot edited a comment on pull request #15631: URL: https://github.com/apache/flink/pull/15631#issuecomment-820458718 ## CI report: * 64a6f1c0ccfa13b72911e110c52814d6cf41040a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16621) * 8db656d8d783b61761d3de7727c0e8831c5e77cd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17361) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions
flinkbot commented on pull request #15798: URL: https://github.com/apache/flink/pull/15798#issuecomment-828544554 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit a74e82590ac3ed03415f84c4127cfd9597f13ef4 (Wed Apr 28 15:21:11 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-22513) Failed to upload compile artifacts
Dawid Wysakowicz created FLINK-22513: Summary: Failed to upload compile artifacts Key: FLINK-22513 URL: https://issues.apache.org/jira/browse/FLINK-22513 Project: Flink Issue Type: Bug Components: Build System / Azure Pipelines Affects Versions: 1.13.0 Reporter: Dawid Wysakowicz https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17343=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=5e2d176c-a6d4-5b54-7808-b11714e363ad -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] twalthr opened a new pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions
twalthr opened a new pull request #15798: URL: https://github.com/apache/flink/pull/15798 ## What is the purpose of the change This fixes a couple of critical bugs in the stack that prevented Table API expressions to be used for schema declaration. Due to time constraints, this PR could not make it into the 1.13 release but should be added to the next bugfix release for a smoother user experience. For testing and consistency, it exposes the Table API expression `sourceWatermark()` in Java, Scala, and Python API. ## Brief change log Updates at various locations that used `TableSchema` before. ## Verifying this change This change added tests and can be verified as follows: `DataStreamJavaITCase.testFromAndToChangelogStreamEventTime` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-11103) Set a default uncaught exception handler
[ https://issues.apache.org/jira/browse/FLINK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334781#comment-17334781 ] Ashwin Kolhatkar commented on FLINK-11103: -- Ok, I can do that. I'll create a class {{UncaughtExceptionHandler}} as the utility, and use it in the main methods for all the entrypoints. > Set a default uncaught exception handler > > > Key: FLINK-11103 > URL: https://issues.apache.org/jira/browse/FLINK-11103 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.8.0 >Reporter: Nico Kruber >Assignee: Ashwin Kolhatkar >Priority: Major > Labels: stale-major, starter, usability > > We should set a default uncaught exception handler in Flink via > {{Thread.setDefaultUncaughtExceptionHandler()}} which at least logs the > exceptions. Ideally, we would even fail the job (could make this > configurable) but users may have some ill-behaving threads, e.g. through > libraries, which they would want to tolerate and we don't want to change > behaviour now. > (FLINK-5232 added this for the JobManager, we need it for the TaskManager) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
flinkbot commented on pull request #15797: URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728 ## CI report: * 9208f004adafc3997d91921efe3e7d291eb8360f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dannycranmer merged pull request #15774: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)
dannycranmer merged pull request #15774: URL: https://github.com/apache/flink/pull/15774 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase
flinkbot edited a comment on pull request #15631: URL: https://github.com/apache/flink/pull/15631#issuecomment-820458718 ## CI report: * 64a6f1c0ccfa13b72911e110c52814d6cf41040a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16621) * 8db656d8d783b61761d3de7727c0e8831c5e77cd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
flinkbot commented on pull request #15797: URL: https://github.com/apache/flink/pull/15797#issuecomment-828524777 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9208f004adafc3997d91921efe3e7d291eb8360f (Wed Apr 28 14:57:04 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-22484).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer
[ https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334774#comment-17334774 ] Marc Demierre edited comment on FLINK-22452 at 4/28/21, 2:54 PM: - I just hit this issue at my company today, due to the mentioned [Prefixed ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] on our shared Kafka cluster. They were enabled on a multi-tenant Kafka to avoid clashes between different tenants. I guess it could be a relatively common scenario. What are the next steps on this? Following the discussions at https://issues.apache.org/jira/browse/FLINK-11654, is a FLIP being opened? I would be available to help if needed. was (Author: mdemierre): I just hit this issue at my company today, due to the mentioned [Prefixed ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] on our shared Kafka cluster. They were enabled on a multi-tenant to avoid clashes between different tenants. I guess it could be a relatively common scenario. What are the next steps on this? Following the discussions at https://issues.apache.org/jira/browse/FLINK-11654, is a FLIP being opened? I would be available to help if needed. > Support specifying custom transactional.id prefix in FlinkKafkaProducer > --- > > Key: FLINK-22452 > URL: https://issues.apache.org/jira/browse/FLINK-22452 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.12.2 >Reporter: Wenhao Ji >Priority: Major > > Currently, the "transactional.id"s of the Kafka producers in > FlinkKafkaProducer are generated based on the task name. This mechanism has > some limitations: > * It will exceed Kafka's limitation if the task name is too long. (resolved > in FLINK-17691) > * They will very likely clash each other if the job topologies are similar. > (discussed in FLINK-11654) > * Only certain "transactional.id" may be authorized by [Prefixed > ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] > on the target Kafka cluster. > Besides, the spring community has introduced the > [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)] > method to their Kafka client. > Therefore, I think it will be necessary to have this feature in the Flink > Kafka connector. > > As discussed in FLINK-11654, the possible solution will be, > * either introduce an additional method called > setTransactionalIdPrefix(String) in the FlinkKafkaProducer, > * or use the existing "transactional.id" properties as the prefix. > And the behavior of the "transactional.id" generation will be > * keep the behavior as it was if absent, > * use the one if present as the prefix for the TransactionalIdsGenerator. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22484) Buit-in functions for collections
[ https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-22484: --- Labels: pull-request-available (was: ) > Buit-in functions for collections > - > > Key: FLINK-22484 > URL: https://issues.apache.org/jira/browse/FLINK-22484 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > There is a number of built-in functions to work with collections are > supported by other vendors. After looking at Postgresql, BigQuery, Spark > there was selected a list of more or less generic functions for collections > (for more details see [1]). > Feedback for the doc is welcome > [1] > [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] snuyanzin opened a new pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
snuyanzin opened a new pull request #15797: URL: https://github.com/apache/flink/pull/15797 ## What is the purpose of the change The PR implements three functions from the list mentioned in FLINK-22484 : `MAP_KEYS`, `MAP_VALUES`, `MAP_FROM_ARRAYS` ## Brief change log Implementation, tests and docs for `MAP_KEYS`, `MAP_VALUES`, `MAP_FROM_ARRAYS` ## Verifying this change This change added tests and can be verified as follows: * `CollectionFunctionsITCase` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs / JavaDocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer
[ https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334774#comment-17334774 ] Marc Demierre commented on FLINK-22452: --- I just hit this issue at my company today, due to the mentioned [Prefixed ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] on our shared Kafka cluster. They were enabled on a multi-tenant to avoid clashes between different tenants. I guess it could be a relatively common scenario. What are the next steps on this? Following the discussions at https://issues.apache.org/jira/browse/FLINK-11654, is a FLIP being opened? I would be available to help if needed. > Support specifying custom transactional.id prefix in FlinkKafkaProducer > --- > > Key: FLINK-22452 > URL: https://issues.apache.org/jira/browse/FLINK-22452 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.12.2 >Reporter: Wenhao Ji >Priority: Major > > Currently, the "transactional.id"s of the Kafka producers in > FlinkKafkaProducer are generated based on the task name. This mechanism has > some limitations: > * It will exceed Kafka's limitation if the task name is too long. (resolved > in FLINK-17691) > * They will very likely clash each other if the job topologies are similar. > (discussed in FLINK-11654) > * Only certain "transactional.id" may be authorized by [Prefixed > ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] > on the target Kafka cluster. > Besides, the spring community has introduced the > [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)] > method to their Kafka client. > Therefore, I think it will be necessary to have this feature in the Flink > Kafka connector. > > As discussed in FLINK-11654, the possible solution will be, > * either introduce an additional method called > setTransactionalIdPrefix(String) in the FlinkKafkaProducer, > * or use the existing "transactional.id" properties as the prefix. > And the behavior of the "transactional.id" generation will be > * keep the behavior as it was if absent, > * use the one if present as the prefix for the TransactionalIdsGenerator. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15712: [FLINK-22400][hive connect]fix NPE problem when convert flink object for Map
flinkbot edited a comment on pull request #15712: URL: https://github.com/apache/flink/pull/15712#issuecomment-824189664 ## CI report: * 8256b986bcfce17787558b17bba6c2c6a9be24c2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17342) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15479: [FLINK-19606][table-runtime-blink] Introduce StreamExecWindowJoin and window join it cases
flinkbot edited a comment on pull request #15479: URL: https://github.com/apache/flink/pull/15479#issuecomment-812344008 ## CI report: * 3764b47414eba44d4ea2f53b6973fe59f3356cfc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17359) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17045) * 1b562e2eec41f48d9ac9b4e7591240b6c8d99e61 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17360) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample
flinkbot edited a comment on pull request #15121: URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240 ## CI report: * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN * b7dca63a2e98f09c8c780a21091b5268d897b220 UNKNOWN * 2b7bb05a4dbbb0883db566589dfe1975fdbb279d UNKNOWN * 60b3b4f14b78889b827c5e573b1d4ef992791d32 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17324) * 00d3356d86576e2a129405a92f9ebb2acb5d6412 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17358) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase
pnowojski commented on pull request #15631: URL: https://github.com/apache/flink/pull/15631#issuecomment-828499112 I have addressed the feedback. I have also included a couple of extra commits from @AHeise, so please pay special attention to those. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a change in pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase
pnowojski commented on a change in pull request #15631: URL: https://github.com/apache/flink/pull/15631#discussion_r67839 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; +import static org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS; +import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A stress test that runs for a pre-defined amount of time, verifying data correctness and every + * couple of checkpoints is triggering fail over to stress test unaligned checkpoints. + */ +@SuppressWarnings("serial") +public class UnalignedCheckpointStressITCase extends TestLogger { + +protected static final int CHECKPOINT_INTERVAL = 20; +protected static final int MINIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 2; +protected static final int MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 10; +protected static final long TEST_DURATION = Time.minutes(1).toMilliseconds(); +protected static final int
[GitHub] [flink] pnowojski commented on a change in pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase
pnowojski commented on a change in pull request #15631: URL: https://github.com/apache/flink/pull/15631#discussion_r63090 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; +import static org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS; +import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A stress test that runs for a pre-defined amount of time, verifying data correctness and every + * couple of checkpoints is triggering fail over to stress test unaligned checkpoints. + */ +@SuppressWarnings("serial") +public class UnalignedCheckpointStressITCase extends TestLogger { + +protected static final int CHECKPOINT_INTERVAL = 20; +protected static final int MINIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 2; +protected static final int MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 10; +protected static final long TEST_DURATION = Time.minutes(1).toMilliseconds(); +protected static final int
[GitHub] [flink] pnowojski commented on a change in pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase
pnowojski commented on a change in pull request #15631: URL: https://github.com/apache/flink/pull/15631#discussion_r60261 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; +import static org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS; +import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A stress test that runs for a pre-defined amount of time, verifying data correctness and every + * couple of checkpoints is triggering fail over to stress test unaligned checkpoints. + */ +@SuppressWarnings("serial") +public class UnalignedCheckpointStressITCase extends TestLogger { + +protected static final int CHECKPOINT_INTERVAL = 20; +protected static final int MINIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 2; +protected static final int MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 10; +protected static final long TEST_DURATION = Time.minutes(1).toMilliseconds(); Review comment: Depending on machine