[jira] [Commented] (FLINK-22387) UpsertKafkaTableITCase hangs when setting up kafka

2021-04-28 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335158#comment-17335158
 ] 

Guowei Ma commented on FLINK-22387:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17371=logs=72d4811f-9f0d-5fd0-014a-0bc26b72b642=c1d93a6a-ba91-515d-3196-2ee8019fbda7=6899

> UpsertKafkaTableITCase hangs when setting up kafka
> --
>
> Key: FLINK-22387
> URL: https://issues.apache.org/jira/browse/FLINK-22387
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16901=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6932
> {code}
> 2021-04-20T20:01:32.2276988Z Apr 20 20:01:32 "main" #1 prio=5 os_prio=0 
> tid=0x7fe87400b000 nid=0x4028 runnable [0x7fe87df22000]
> 2021-04-20T20:01:32.2277666Z Apr 20 20:01:32java.lang.Thread.State: 
> RUNNABLE
> 2021-04-20T20:01:32.2278338Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okio.Buffer.getByte(Buffer.java:312)
> 2021-04-20T20:01:32.2279325Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:310)
> 2021-04-20T20:01:32.2280656Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.readChunkSize(Http1ExchangeCodec.java:492)
> 2021-04-20T20:01:32.2281603Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.read(Http1ExchangeCodec.java:471)
> 2021-04-20T20:01:32.2282163Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.internal.Util.skipAll(Util.java:204)
> 2021-04-20T20:01:32.2282870Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.internal.Util.discard(Util.java:186)
> 2021-04-20T20:01:32.2283494Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.close(Http1ExchangeCodec.java:511)
> 2021-04-20T20:01:32.2284460Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okio.ForwardingSource.close(ForwardingSource.java:43)
> 2021-04-20T20:01:32.2285183Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.internal.connection.Exchange$ResponseBodySource.close(Exchange.java:313)
> 2021-04-20T20:01:32.2285756Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okio.RealBufferedSource.close(RealBufferedSource.java:476)
> 2021-04-20T20:01:32.2286287Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.internal.Util.closeQuietly(Util.java:139)
> 2021-04-20T20:01:32.2286795Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.ResponseBody.close(ResponseBody.java:192)
> 2021-04-20T20:01:32.2287270Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.okhttp3.Response.close(Response.java:290)
> 2021-04-20T20:01:32.2287913Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.com.github.dockerjava.okhttp.OkDockerHttpClient$OkResponse.close(OkDockerHttpClient.java:285)
> 2021-04-20T20:01:32.2288606Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.com.github.dockerjava.core.DefaultInvocationBuilder.lambda$null$0(DefaultInvocationBuilder.java:272)
> 2021-04-20T20:01:32.2289295Z Apr 20 20:01:32  at 
> org.testcontainers.shaded.com.github.dockerjava.core.DefaultInvocationBuilder$$Lambda$340/2058508175.close(Unknown
>  Source)
> 2021-04-20T20:01:32.2289886Z Apr 20 20:01:32  at 
> com.github.dockerjava.api.async.ResultCallbackTemplate.close(ResultCallbackTemplate.java:77)
> 2021-04-20T20:01:32.2290567Z Apr 20 20:01:32  at 
> org.testcontainers.utility.ResourceReaper.start(ResourceReaper.java:202)
> 2021-04-20T20:01:32.2291051Z Apr 20 20:01:32  at 
> org.testcontainers.DockerClientFactory.client(DockerClientFactory.java:205)
> 2021-04-20T20:01:32.2291879Z Apr 20 20:01:32  - locked <0xe9cd50f8> 
> (a [Ljava.lang.Object;)
> 2021-04-20T20:01:32.2292313Z Apr 20 20:01:32  at 
> org.testcontainers.LazyDockerClient.getDockerClient(LazyDockerClient.java:14)
> 2021-04-20T20:01:32.2292870Z Apr 20 20:01:32  at 
> org.testcontainers.LazyDockerClient.authConfig(LazyDockerClient.java:12)
> 2021-04-20T20:01:32.2293383Z Apr 20 20:01:32  at 
> org.testcontainers.containers.GenericContainer.start(GenericContainer.java:310)
> 2021-04-20T20:01:32.2293890Z Apr 20 20:01:32  at 
> org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1029)
> 2021-04-20T20:01:32.2294578Z Apr 20 20:01:32  at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> 2021-04-20T20:01:32.2295157Z Apr 20 20:01:32  at 
> 

[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15797:
URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728


   
   ## CI report:
   
   * 32b72459e66decd79269d3718f1ee40c3d765775 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17372)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-22285) YARNHighAvailabilityITCase hangs

2021-04-28 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335141#comment-17335141
 ] 

Guowei Ma edited comment on FLINK-22285 at 4/29/21, 5:44 AM:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17356=logs=fc5181b0-e452-5c8f-68de-1097947f6483=6b04ca5f-0b52-511d-19c9-52bf0d9fbdfa=26030

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17353=logs=fc5181b0-e452-5c8f-68de-1097947f6483=6b04ca5f-0b52-511d-19c9-52bf0d9fbdfa=26429


was (Author: maguowei):
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17356=logs=fc5181b0-e452-5c8f-68de-1097947f6483=6b04ca5f-0b52-511d-19c9-52bf0d9fbdfa=26030

> YARNHighAvailabilityITCase hangs
> 
>
> Key: FLINK-22285
> URL: https://issues.apache.org/jira/browse/FLINK-22285
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16568=logs=8fd975ef-f478-511d-4997-6f15fe8a1fd3=ac0fa443-5d45-5a6b-3597-0310ecc1d2ab=31437



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22499) JDBC sink table-api support "sink.parallelism" ?

2021-04-28 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335142#comment-17335142
 ] 

Shengkai Fang commented on FLINK-22499:
---

This is the feature supported in FLINK-19942. You may cp the feature to your 
branch.

> JDBC  sink table-api  support "sink.parallelism" ?
> --
>
> Key: FLINK-22499
> URL: https://issues.apache.org/jira/browse/FLINK-22499
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Affects Versions: 1.12.1
>Reporter: ranqiqiang
>Priority: Major
>
> I receive message from kafka, then sink to mysql/tidb across the sql 
> table-api,
> How can I change the sink parallelism ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22285) YARNHighAvailabilityITCase hangs

2021-04-28 Thread Guowei Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guowei Ma updated FLINK-22285:
--
Affects Version/s: 1.11.3

> YARNHighAvailabilityITCase hangs
> 
>
> Key: FLINK-22285
> URL: https://issues.apache.org/jira/browse/FLINK-22285
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16568=logs=8fd975ef-f478-511d-4997-6f15fe8a1fd3=ac0fa443-5d45-5a6b-3597-0310ecc1d2ab=31437



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22285) YARNHighAvailabilityITCase hangs

2021-04-28 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335141#comment-17335141
 ] 

Guowei Ma commented on FLINK-22285:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17356=logs=fc5181b0-e452-5c8f-68de-1097947f6483=6b04ca5f-0b52-511d-19c9-52bf0d9fbdfa=26030

> YARNHighAvailabilityITCase hangs
> 
>
> Key: FLINK-22285
> URL: https://issues.apache.org/jira/browse/FLINK-22285
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.12.2
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16568=logs=8fd975ef-f478-511d-4997-6f15fe8a1fd3=ac0fa443-5d45-5a6b-3597-0310ecc1d2ab=31437



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22520) KafkaSourceLegacyITCase.testMultipleSourcesOnePartition hangs

2021-04-28 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-22520:
-

 Summary: KafkaSourceLegacyITCase.testMultipleSourcesOnePartition 
hangs
 Key: FLINK-22520
 URL: https://issues.apache.org/jira/browse/FLINK-22520
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.0
Reporter: Guowei Ma


There is no any error messages.
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17363=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=42023


{code:java}
"main" #1 prio=5 os_prio=0 tid=0x7f4d3400b000 nid=0x203f waiting on 
condition [0x7f4d3be2e000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xa68f3b68> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:49)
at 
org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:1112)
at 
org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase.testMultipleSourcesOnePartition(KafkaSourceLegacyITCase.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22503) getting error on select query in flink sql

2021-04-28 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335137#comment-17335137
 ] 

Shengkai Fang commented on FLINK-22503:
---

Hi. This is the expected error. Please call `{{SET 
execution.result-mode=tableau;}}` before SELECT in batch more. Please refer to 
[1] for more details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html#running-sql-queries


> getting error on select query in flink sql 
> ---
>
> Key: FLINK-22503
> URL: https://issues.apache.org/jira/browse/FLINK-22503
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.12.2
> Environment: Hi Team,
> I added source table *orders* with filesystem connector using csv file. when 
> i am running the select * from orders, its throwing error. even i chaged the 
> mode also not working.
>  !image-2021-04-28-13-33-36-784.png! 
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.client.gateway.SqlExecutionException: Results of batch 
> queries can only be served in table or tableau mode
> 2) I used sink as Elasticsearch  , but not able to see the data in 
> elasticsearch index..  Can you please check and help me two issues
>  !image-2021-04-28-13-40-42-375.png! 
> !image-2021-04-28-13-43-00-455.png! 
>Reporter: Bhagi
>Priority: Major
> Attachments: image-2021-04-28-13-33-36-784.png, 
> image-2021-04-28-13-40-42-375.png, image-2021-04-28-13-43-00-455.png
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22503) getting error on select query in flink sql

2021-04-28 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-22503.
-
Resolution: Not A Bug

> getting error on select query in flink sql 
> ---
>
> Key: FLINK-22503
> URL: https://issues.apache.org/jira/browse/FLINK-22503
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.12.2
> Environment: Hi Team,
> I added source table *orders* with filesystem connector using csv file. when 
> i am running the select * from orders, its throwing error. even i chaged the 
> mode also not working.
>  !image-2021-04-28-13-33-36-784.png! 
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.client.gateway.SqlExecutionException: Results of batch 
> queries can only be served in table or tableau mode
> 2) I used sink as Elasticsearch  , but not able to see the data in 
> elasticsearch index..  Can you please check and help me two issues
>  !image-2021-04-28-13-40-42-375.png! 
> !image-2021-04-28-13-43-00-455.png! 
>Reporter: Bhagi
>Priority: Major
> Attachments: image-2021-04-28-13-33-36-784.png, 
> image-2021-04-28-13-40-42-375.png, image-2021-04-28-13-43-00-455.png
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22503) getting error on select query in flink sql

2021-04-28 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-22503:
--
Component/s: (was: Table SQL / API)
 Table SQL / Client

> getting error on select query in flink sql 
> ---
>
> Key: FLINK-22503
> URL: https://issues.apache.org/jira/browse/FLINK-22503
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.12.2
> Environment: Hi Team,
> I added source table *orders* with filesystem connector using csv file. when 
> i am running the select * from orders, its throwing error. even i chaged the 
> mode also not working.
>  !image-2021-04-28-13-33-36-784.png! 
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.client.gateway.SqlExecutionException: Results of batch 
> queries can only be served in table or tableau mode
> 2) I used sink as Elasticsearch  , but not able to see the data in 
> elasticsearch index..  Can you please check and help me two issues
>  !image-2021-04-28-13-40-42-375.png! 
> !image-2021-04-28-13-43-00-455.png! 
>Reporter: Bhagi
>Priority: Major
> Attachments: image-2021-04-28-13-33-36-784.png, 
> image-2021-04-28-13-40-42-375.png, image-2021-04-28-13-43-00-455.png
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] YikSanChan closed pull request #15404: [hotfix][docs] Remove redundant import

2021-04-28 Thread GitBox


YikSanChan closed pull request #15404:
URL: https://github.com/apache/flink/pull/15404


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-20086) Add documentation for the open method of UserDefinedFunction

2021-04-28 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-20086.
---
Fix Version/s: (was: 1.11.4)
   (was: 1.13.0)
   1.13.1
   1.14.0
   Resolution: Fixed

Merged to
- master via c593209b664af0d112b22c386266476f3a2ee750
- release-1.13 via 8ffefdaed5f2ac22a9b720dc5d0293b7be88a0d4

> Add documentation for the open method of UserDefinedFunction
> 
>
> Key: FLINK-20086
> URL: https://issues.apache.org/jira/browse/FLINK-20086
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Reporter: Dian Fu
>Assignee: Yik San Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.1
>
>
> According to the questions asked by PyFlink users so far, many users are not 
> aware that there is a open method in UserDefinedFunction where they could 
> perform initialization work. This method is especially useful for ML users 
> where they could perform ML mode initialization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-20086) Add documentation for the open method of UserDefinedFunction

2021-04-28 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-20086:
---

Assignee: Yik San Chan

> Add documentation for the open method of UserDefinedFunction
> 
>
> Key: FLINK-20086
> URL: https://issues.apache.org/jira/browse/FLINK-20086
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Reporter: Dian Fu
>Assignee: Yik San Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.4, 1.13.0
>
>
> According to the questions asked by PyFlink users so far, many users are not 
> aware that there is a open method in UserDefinedFunction where they could 
> perform initialization work. This method is especially useful for ML users 
> where they could perform ML mode initialization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu closed pull request #15795: [FLINK-20086][docs] Override UserDefinedFunction open to load resources

2021-04-28 Thread GitBox


dianfu closed pull request #15795:
URL: https://github.com/apache/flink/pull/15795


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-17826) Add missing custom query support on new jdbc connector

2021-04-28 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335131#comment-17335131
 ] 

Leonard Xu edited comment on FLINK-17826 at 4/29/21, 4:02 AM:
--

Hi, [~f.pompermaier] 

The concern about the custom query is that 
 (1) After add a custom query, the JDBC table should be a view of JDBC (e.g 
select * from db1.A join db1.B) instead of a table from semantics, the 
'table-name' option is useless in this case.

(2) The table may can only be used as source table and can not as sink table 
for example above case.

base on these,  I tend to do not add the unclear option.

But If you're interesting this issue, you can take over and I can help review 
and merge. We can add some specification  for (1) and add more checks for (2)

 


was (Author: leonard xu):
Hi, [~f.pompermaier] 

The concern about the custom query is that 
(1) After add a custom query, the JDBC table should be a view of JDBC (e.g 
select * from db1.A join db1.B) instead of a table from semantics.

(2) The table may can only be used as source table and can not as sink table 
for example above case.

base on these,  I tend to do not add the unclear option.

But If you're interesting this issue, you can take over and I can help review 
and merge. We can add some specification  for (1) and add more checks for (2)

 

> Add missing custom query support on new jdbc connector
> --
>
> Key: FLINK-17826
> URL: https://issues.apache.org/jira/browse/FLINK-17826
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Reporter: Jark Wu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> In FLINK-17361, we added custom query on JDBC tables, but missing to add the 
> same ability on new jdbc connector (i.e. 
> {{JdbcDynamicTableSourceSinkFactory}}). 
> In the new jdbc connector, maybe we should call it {{scan.query}} to keep 
> consistent with other scan options, besides we need to make {{"table-name"}} 
> optional, but add validation that "table-name" and "scan.query" shouldn't 
> both be empty, and "table-name" must not be empty when used as sink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17826) Add missing custom query support on new jdbc connector

2021-04-28 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335131#comment-17335131
 ] 

Leonard Xu commented on FLINK-17826:


Hi, [~f.pompermaier] 

The concern about the custom query is that 
(1) After add a custom query, the JDBC table should be a view of JDBC (e.g 
select * from db1.A join db1.B) instead of a table from semantics.

(2) The table may can only be used as source table and can not as sink table 
for example above case.

base on these,  I tend to do not add the unclear option.

But If you're interesting this issue, you can take over and I can help review 
and merge. We can add some specification  for (1) and add more checks for (2)

 

> Add missing custom query support on new jdbc connector
> --
>
> Key: FLINK-17826
> URL: https://issues.apache.org/jira/browse/FLINK-17826
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Reporter: Jark Wu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> In FLINK-17361, we added custom query on JDBC tables, but missing to add the 
> same ability on new jdbc connector (i.e. 
> {{JdbcDynamicTableSourceSinkFactory}}). 
> In the new jdbc connector, maybe we should call it {{scan.query}} to keep 
> consistent with other scan options, besides we need to make {{"table-name"}} 
> optional, but add validation that "table-name" and "scan.query" shouldn't 
> both be empty, and "table-name" must not be empty when used as sink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22519) Have python-archives also take tar.gz

2021-04-28 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-22519:
---

Assignee: Yik San Chan

> Have python-archives also take tar.gz
> -
>
> Key: FLINK-22519
> URL: https://issues.apache.org/jira/browse/FLINK-22519
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Reporter: Yik San Chan
>Assignee: Yik San Chan
>Priority: Major
>
> [python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
>  currently only takes zip.
> In our use case, we want to package the whole conda environment into 
> python-archives, similar to how the 
> [docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
>  suggest about using venv (Python virtual environment). As we use PyFlink for 
> ML, there are inevitably a few large dependencies (tensorflow, torch, 
> pyarrow), as well as a lot of small dependencies.
> This pattern is not friendly for zip. According to the 
> [post|https://superuser.com/a/173825], zip compresses each file 
> independently, and it is not performing good when dealing with a lot of small 
> files. On the other hand, tar simply bundles all files into a tarball, then 
> we can apply gzip to the whole tarball to achieve smaller size. This may 
> explain why the official packaging tool - conda pack -  [conda 
> pack|https://conda.github.io/conda-pack/] - produces tar.gz by default, even 
> though zip is an option if we really want to.
> To further prove the idea, I use my laptop and conda env to run an 
> experiment. My OS: macOS 10.15.7
>  # Create an environment.yaml as well as a requirements.txt
>  # Run `conda env create -f environment.yaml` to create the conda env
>  # Run conda pack to produce a tar.gz
>  # Run conda pack faetflow-ml-env.zip to produce a zip
> More details:
> environment.yaml
> {code:yaml}
> name: featflow-ml-env
> channels:
> - pytorch
> - conda-forge
> - defaults
> dependencies:
> - python=3.7
> - pytorch=1.8.0
> - scikit-learn=0.23.2
> - pip
> - pip:
> - -r file:requirements.txt
> {code}
> requirements.txt
> {code:yaml}
> apache-flink==1.12.0
> deepctr-torch==0.2.6
> black==20.8b1
> confluent-kafka==1.6.0
> pytest==6.2.2
> testcontainers==3.4.0
> kafka-python==2.0.2
> {code}
>  
> End result: the tar.gz is 854M, the zip is 1.6G
> So, long story short, python-archives only support zip, while zip is not a 
> good choice for packaging ML libs. Let's change this by adding 
> python-archives tar.gz support.
> Change will happen in this way: In ProcessPythonEnvironmentManager.java, 
> check the suffix. If tar.gz, unarchive it using gzip decompresser.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20951) IllegalArgumentException when reading Hive parquet table if condition not contain all partitioned fields

2021-04-28 Thread Rui Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Li updated FLINK-20951:
---
Fix Version/s: (was: 1.12.4)
   (was: 1.13.0)

> IllegalArgumentException when reading Hive parquet table if condition not 
> contain all partitioned fields
> 
>
> Key: FLINK-20951
> URL: https://issues.apache.org/jira/browse/FLINK-20951
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0
> Environment: flink 1.12.0release-12
> sql-cli
>Reporter: YUJIANBO
>Assignee: Rui Li
>Priority: Critical
>  Labels: stale-assigned
>
> The production hive table is partitioned by two fields:datekey and event
> I have do this test by Flink-sql-cli:(Spark Sql All is OK)
> (1)First:
> SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid
> = 'aa';(OK)
> SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa';
> (Error)
> (2)Second:
> SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid
> = 'bb';(OK)
> SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb';
> (Error)
> The exception is:
> {code}
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> Caused by: java.lang.IllegalArgumentException
> at java.nio.Buffer.position(Buffer.java:244)
> at
> org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199)
> at
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359)
> at
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328)
> at
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
> ... 6 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu closed pull request #15783: [hotfix][python][docs] Fix python.archives docs

2021-04-28 Thread GitBox


dianfu closed pull request #15783:
URL: https://github.com/apache/flink/pull/15783


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22519) Have python-archives also take tar.gz

2021-04-28 Thread Yik San Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yik San Chan updated FLINK-22519:
-
Description: 
[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

More details:

environment.yaml

{code:yaml}
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
{code}

requirements.txt

{code:yaml}
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
{code}
 
End result: the tar.gz is 854M, the zip is 1.6G

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.

  was:
[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

More details:

environment.yaml


{code:txt}
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
{code}

requirements.txt
```
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
```
 
End result: the tar.gz is 854M, the zip is 1.6G

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.


> Have python-archives also take tar.gz
> -
>
> Key: FLINK-22519
> URL: https://issues.apache.org/jira/browse/FLINK-22519
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Reporter: Yik San Chan
>Priority: Major
>
> 

[jira] [Commented] (FLINK-20951) IllegalArgumentException when reading Hive parquet table if condition not contain all partitioned fields

2021-04-28 Thread Rui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335129#comment-17335129
 ] 

Rui Li commented on FLINK-20951:


Removing the fix version since this is not easy to reproduce. And there's 
chance that it's related to FLINK-22202.

> IllegalArgumentException when reading Hive parquet table if condition not 
> contain all partitioned fields
> 
>
> Key: FLINK-20951
> URL: https://issues.apache.org/jira/browse/FLINK-20951
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0
> Environment: flink 1.12.0release-12
> sql-cli
>Reporter: YUJIANBO
>Assignee: Rui Li
>Priority: Critical
>  Labels: stale-assigned
> Fix For: 1.13.0, 1.12.4
>
>
> The production hive table is partitioned by two fields:datekey and event
> I have do this test by Flink-sql-cli:(Spark Sql All is OK)
> (1)First:
> SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid
> = 'aa';(OK)
> SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa';
> (Error)
> (2)Second:
> SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid
> = 'bb';(OK)
> SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb';
> (Error)
> The exception is:
> {code}
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> Caused by: java.lang.IllegalArgumentException
> at java.nio.Buffer.position(Buffer.java:244)
> at
> org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33)
> at
> org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199)
> at
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359)
> at
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328)
> at
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
> at
> 

[jira] [Updated] (FLINK-22519) Have python-archives also take tar.gz

2021-04-28 Thread Yik San Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yik San Chan updated FLINK-22519:
-
Description: 
[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

More details:

environment.yaml


{code:txt}
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
{code}

requirements.txt
```
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
```
 
End result: the tar.gz is 854M, the zip is 1.6G

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.

  was:
[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

More details:

environment.yaml
```
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
```
requirements.txt
```
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
```
 
End result: the tar.gz is 854M, the zip is 1.6G

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.


> Have python-archives also take tar.gz
> -
>
> Key: FLINK-22519
> URL: https://issues.apache.org/jira/browse/FLINK-22519
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Reporter: Yik San Chan
>Priority: Major
>
> 

[jira] [Updated] (FLINK-22519) Have python-archives also take tar.gz

2021-04-28 Thread Yik San Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yik San Chan updated FLINK-22519:
-
Description: 
[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

More details:

environment.yaml
```
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
```
requirements.txt
```
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
```
 
End result: the tar.gz is 854M, the zip is 1.6G

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.

  was:
[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

More details

```
# environment.yaml
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
```
```
#requirements.txt
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
```
 
End result: the tar.gz is 854M, the zip is 1.6G

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.


> Have python-archives also take tar.gz
> -
>
> Key: FLINK-22519
> URL: https://issues.apache.org/jira/browse/FLINK-22519
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Reporter: Yik San Chan
>Priority: Major
>
> 

[jira] [Updated] (FLINK-22519) Have python-archives also take tar.gz

2021-04-28 Thread Yik San Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yik San Chan updated FLINK-22519:
-
Description: 
[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

More details

```
# environment.yaml
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
```
```
#requirements.txt
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
```
 
End result: the tar.gz is 854M, the zip is 1.6G

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.

  was:
[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

 

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

 

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

 

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

 

# environment.yaml

 
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
 
#requirements.txt
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
 
End result: the tar.gz is 854M, the zip is 1.6G

 

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

 

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.


> Have python-archives also take tar.gz
> -
>
> Key: FLINK-22519
> URL: https://issues.apache.org/jira/browse/FLINK-22519
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Reporter: Yik San Chan
>Priority: Major
>
> 

[jira] [Created] (FLINK-22519) Have python-archives also take tar.gz

2021-04-28 Thread Yik San Chan (Jira)
Yik San Chan created FLINK-22519:


 Summary: Have python-archives also take tar.gz
 Key: FLINK-22519
 URL: https://issues.apache.org/jira/browse/FLINK-22519
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Reporter: Yik San Chan


[python-archives|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives]
 currently only takes zip.

 

In our use case, we want to package the whole conda environment into 
python-archives, similar to how the 
[docs|https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/faq.html#cluster]
 suggest about using venv (Python virtual environment). As we use PyFlink for 
ML, there are inevitably a few large dependencies (tensorflow, torch, pyarrow), 
as well as a lot of small dependencies.

 

This pattern is not friendly for zip. According to the 
[post|https://superuser.com/a/173825], zip compresses each file independently, 
and it is not performing good when dealing with a lot of small files. On the 
other hand, tar simply bundles all files into a tarball, then we can apply gzip 
to the whole tarball to achieve smaller size. This may explain why the official 
packaging tool - conda pack -  [conda pack|https://conda.github.io/conda-pack/] 
- produces tar.gz by default, even though zip is an option if we really want to.

 

To further prove the idea, I use my laptop and conda env to run an experiment. 
My OS: macOS 10.15.7
 # Create an environment.yaml as well as a requirements.txt
 # Run `conda env create -f environment.yaml` to create the conda env
 
 # Run conda pack to produce a tar.gz
 # Run conda pack faetflow-ml-env.zip to produce a zip

 

# environment.yaml

 
name: featflow-ml-env
channels:
- pytorch
- conda-forge
- defaults
dependencies:
- python=3.7
- pytorch=1.8.0
- scikit-learn=0.23.2
- pip
- pip:
- -r file:requirements.txt
 
#requirements.txt
apache-flink==1.12.0
deepctr-torch==0.2.6
black==20.8b1
confluent-kafka==1.6.0
pytest==6.2.2
testcontainers==3.4.0
kafka-python==2.0.2
 
End result: the tar.gz is 854M, the zip is 1.6G

 

So, long story short, python-archives only support zip, while zip is not a good 
choice for packaging ML libs. Let's change this by adding python-archives 
tar.gz support.

 

Change will happen in this way: In ProcessPythonEnvironmentManager.java, check 
the suffix. If tar.gz, unarchive it using gzip decompresser.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22518) Translate the page of "High Availability (HA)" into Chinese

2021-04-28 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-22518:
---

Assignee: movesan Yang

> Translate the page of "High Availability (HA)" into Chinese
> ---
>
> Key: FLINK-22518
> URL: https://issues.apache.org/jira/browse/FLINK-22518
> Project: Flink
>  Issue Type: New Feature
>  Components: chinese-translation
>Reporter: movesan Yang
>Assignee: movesan Yang
>Priority: Major
>
>  
> The model of "High Availability (HA)" contains the following three pages: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha]
> [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html|https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html,]
> [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/kubernetes_ha.html]
> The markdown file can be found in 
> [https://github.com/apache/flink/tree/master/docs/content.zh/docs/deployment/ha]
>  in English.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-28 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Environment: hadoop-2.8.4,Flink-1.11.2  (was: hadoop-2.8.4)

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22499) JDBC sink table-api support "sink.parallelism" ?

2021-04-28 Thread ranqiqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335122#comment-17335122
 ] 

ranqiqiang commented on FLINK-22499:


thanks, but the version is "jdbc .. {{1.14-SNAPSHOT}}"

If I use flink 1.12.2,  then use “jdbc-connector {{1.14-SNAPSHOT” , can I do 
that? }}

> JDBC  sink table-api  support "sink.parallelism" ?
> --
>
> Key: FLINK-22499
> URL: https://issues.apache.org/jira/browse/FLINK-22499
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Affects Versions: 1.12.1
>Reporter: ranqiqiang
>Priority: Major
>
> I receive message from kafka, then sink to mysql/tidb across the sql 
> table-api,
> How can I change the sink parallelism ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22266) Harden JobMasterStopWithSavepointITCase

2021-04-28 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335120#comment-17335120
 ] 

Guowei Ma commented on FLINK-22266:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17368=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=a0a633b8-47ef-5c5a-2806-3c13b9e48228=4591

> Harden JobMasterStopWithSavepointITCase
> ---
>
> Key: FLINK-22266
> URL: https://issues.apache.org/jira/browse/FLINK-22266
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Guowei Ma
>Assignee: Dawid Wysakowicz
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16451=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0=3884
> {code:java}
> [ERROR] 
> throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate(org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase)
>   Time elapsed: 0.154 s  <<< FAILURE!
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithoutRestartsHelper(JobMasterStopWithSavepointITCase.java:154)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointITCase.throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate(JobMasterStopWithSavepointITCase.java:138)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22518) Translate the page of "High Availability (HA)" into Chinese

2021-04-28 Thread movesan (Jira)
movesan created FLINK-22518:
---

 Summary: Translate the page of "High Availability (HA)" into 
Chinese
 Key: FLINK-22518
 URL: https://issues.apache.org/jira/browse/FLINK-22518
 Project: Flink
  Issue Type: New Feature
  Components: chinese-translation
Reporter: movesan


 

The model of "High Availability (HA)" contains the following three pages: 

[https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha]

[https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html|https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html,]

[https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/kubernetes_ha.html]

The markdown file can be found in 
[https://github.com/apache/flink/tree/master/docs/content.zh/docs/deployment/ha]
 in English.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong closed pull request #15745: [FLINK-22304][table] Refactor some interfaces for TVF based window to…

2021-04-28 Thread GitBox


wuchong closed pull request #15745:
URL: https://github.com/apache/flink/pull/15745


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-web] wuchong commented on pull request #436: Add Apache Flink release 1.13.0

2021-04-28 Thread GitBox


wuchong commented on pull request #436:
URL: https://github.com/apache/flink-web/pull/436#issuecomment-828916305


   LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22518) Translate the page of "High Availability (HA)" into Chinese

2021-04-28 Thread movesan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335114#comment-17335114
 ] 

movesan commented on FLINK-22518:
-

Hi [~jark] . I am very interested in this job, Could you please assign this 
ticket to me? Thanks a lot.

> Translate the page of "High Availability (HA)" into Chinese
> ---
>
> Key: FLINK-22518
> URL: https://issues.apache.org/jira/browse/FLINK-22518
> Project: Flink
>  Issue Type: New Feature
>  Components: chinese-translation
>Reporter: movesan
>Priority: Major
>
>  
> The model of "High Availability (HA)" contains the following three pages: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha]
> [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html|https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/zookeeper_ha.html,]
> [https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/ha/kubernetes_ha.html]
> The markdown file can be found in 
> [https://github.com/apache/flink/tree/master/docs/content.zh/docs/deployment/ha]
>  in English.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16428) Fine-grained network buffer management for backpressure

2021-04-28 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-16428:

Labels:   (was: stale-critical)

> Fine-grained network buffer management for backpressure
> ---
>
> Key: FLINK-16428
> URL: https://issues.apache.org/jira/browse/FLINK-16428
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Zhijiang
>Priority: Critical
> Fix For: 1.14.0
>
>
> It is an umbrella ticket for tracing the progress of this improvement.
> This is the second ingredient to solve the “checkpoints under backpressure” 
> problem (together with unaligned checkpoints). It consists of two steps:
>  * See if we can use less network memory in general for streaming jobs (with 
> potentially different distribution of floating buffers in the input side)
>  * Under backpressure, reduce network memory to have less in-flight data 
> (needs specification of algorithm and experiments)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] PatrickRen commented on pull request #15767: [FLINK-22393][docs-zh] Translate the page of "Execution Mode (Batch/Streaming)" into Chinese

2021-04-28 Thread GitBox


PatrickRen commented on pull request #15767:
URL: https://github.com/apache/flink/pull/15767#issuecomment-828900787


   Thanks for the contribution @95chenjz ! But it looks like a new `.md` file 
is created for the Chinese documentation. I think the translation should be 
made in the file `docs/content.zh/docs/dev/datastream/execution_mode.md` 
instead of creating a new one. 
   
   Also the `README.md` under `docs` describes how to build the doc and view 
locally on your browser. You can have a preview to see if your modifications 
take effect. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22517) Fix pickle compatibility problem in different Python versions

2021-04-28 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-22517:


 Summary: Fix pickle compatibility problem in different Python 
versions
 Key: FLINK-22517
 URL: https://issues.apache.org/jira/browse/FLINK-22517
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.3, 1.13.0
Reporter: Huang Xingbo
Assignee: Huang Xingbo


Since release-1.12, PyFlink has supported Python3 8. Starting from Python 3.8, 
the default protocol version used by pickle is 
pickle5(https://www.python.org/dev/peps/pep-0574/), which will raising the 
following exception if the client uses python 3.8 to compile program and the 
cluster node uses python 3.7 or python 3.6 to run python udf:

{code:python}
ValueError: unsupported pickle protocol: 5
{code}

The workaround is to first let the python version used by the client be 3.6 or 
3.7. For how to specify the client-side python execution environment, please 
refer to the 
doc(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-client-executable).




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] PatrickRen commented on a change in pull request #15763: [FLINK-22364][doc] Translate the page of "Data Sources" to Chinese.

2021-04-28 Thread GitBox


PatrickRen commented on a change in pull request #15763:
URL: https://github.com/apache/flink/pull/15763#discussion_r620113572



##
File path: docs/content.zh/docs/dev/datastream/sources.md
##
@@ -154,28 +161,32 @@ class MySplitEnumerator implements 
SplitEnumerator {
 }
 ```
 
-### SourceReader
+
+
+### 源阅读器
+
+[源阅读器](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java)是一个运行在Task
 Managers上的组件,用于处理来自分片的记录。
+
+`源阅读器`公布了一个拉动式(pull-based)处理接口。Flink任务会在循环中不断调用 `pollNext(ReaderOutput)` 
来轮询来自`源阅读器`的记录。`pollNext(ReaderOutput)` 方法的返回值指示source reader的状态。

Review comment:
   `源阅读器`提供了一个拉动式(pull-based)处理接口

##
File path: docs/content.zh/docs/dev/datastream/sources.md
##
@@ -154,28 +161,32 @@ class MySplitEnumerator implements 
SplitEnumerator {
 }
 ```
 
-### SourceReader
+
+
+### 源阅读器
+
+[源阅读器](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java)是一个运行在Task
 Managers上的组件,用于处理来自分片的记录。

Review comment:
   是一个运行在 Task Manager ~~s~~ 上的组件

##
File path: docs/content.zh/docs/dev/datastream/sources.md
##
@@ -354,32 +374,36 @@ environment.fromSource(
 String sourceName)
 ```
 
-The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of 
the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to 
implement any timestamp extraction and watermark generation code.
+`TimestampAssigner` 和 `WatermarkGenerator` 作为 `ReaderOutput`(或 
`SourceOutput`)的一部分透明地运行,因此source实现者不必实现任何时间戳提取和水印生成的代码。 
+
+
 
- Event Timestamps
+ 事件时间戳
 
-Event timestamps are assigned in two steps:
+事件时间戳的分配分为以下两步:
 
-  1. The SourceReader may attach the *source record timestamp* to the event, 
by calling `SourceOutput.collect(event, timestamp)`.
- This is relevant only for data sources that are record-based and have 
timestamps, such as Kafka, Kinesis, Pulsar, or Pravega.
- Sources that are not based on records with timestamps (like files) do not 
have a *source record timestamp*.
- This step is part of the source connector implementation and not 
parameterized by the application that uses the source.
+  1. 源阅读器可以通过调用 `SourceOutput.collect(event, timestamp)` 将*源记录时间戳*添加到事件中。
+ 这只与基于记录并且拥有时间戳特性的数据源有关,例如 Kafka、Kinesis、Pulsar 或 Pravega。
+ 不带有时间戳的记录的源(如文件)没有*源记录时间戳*的特性。
+ 此步骤是源连接器实现的一部分,不由使用源的应用程序进行参数化设定。
+
+  2. 由应用程序配置的 `TimestampAssigner` 分配最终的时间戳。
+ `TimestampAssigner` 
会查看原始的*源记录时间戳*和事件。分配器可以直接使用*源记录时间戳*或者访问事件的某个字段获得最终的事件时间戳。
+
+这种分两步的方法使用户既可以引用源系统中的时间戳,也可以引用事件数据中的时间戳作为事件时间戳。
 
-  2. The `TimestampAssigner`, which is configured by the application, assigns 
the final timestamp.
- The `TimestampAssigner` sees the original *source record timestamp* and 
the event. The assigner can use the *source record timestamp* or access a field 
of the event obtain the final event timestamp.
-  
-This two-step approach allows users to reference both timestamps from the 
source systems and timestamps in the event's data as the event timestamp.
+*注意:*当使用没有*源记录时间戳*的数据源(如文件)并选择*源记录时间戳*作为最终的事件时间戳时,事件的默认时间戳等于 `LONG_MIN` 
*(=-9,223,372,036,854,775,808)*。
 
-*Note:* When using a data source without *source record timestamps* (like 
files) and selecting the *source record timestamp* as the final event 
timestamp, events will get a default timestamp equal to `LONG_MIN` 
*(=-9,223,372,036,854,775,808)*.
+
 
- Watermark Generation
+ 水印生成
 
-Watermark Generators are only active during streaming execution. Batch 
execution deactivates Watermark Generators; all related operations described 
below become effectively no-ops.
+水印生成器仅在流执行期间会被激活。批处理执行则会停用水印生成器,则下文所述的所有相关操作实际上都变为无操作。
 
-The data source API supports running watermark generators individually *per 
split*. That allows Flink to observe the event time progress per split 
individually, which is important to handle *event time skew* properly and 
prevent *idle partitions* from holding back the event time progress of the 
entire application.
+数据源API支持*每个分片*单独运行水印生成器。这使得Flink可以分别观察每个分片的事件时间进度,这对于正确处理*事件时间偏斜*和防止*空闲分区*阻碍整个应用程序的事件时间进度是很重要的。
 
 {{< img width="80%" src="fig/per_split_watermarks.svg" alt="Watermark 
Generation in a Source with two Splits." >}}
 
-When implementing a source connector using the *Split Reader API*, this is 
automatically handled. All implementations based on the Split Reader API have 
split-aware watermarks out-of-the-box.
+使用*分片提取器 API*实现源连接器时,将自动进行处理。所有基于分片提取器 API的实现都具有开箱即用(out-of-the-box)的可识别分片的水印。

Review comment:
   使用*分片提取器 API*实现源连接器时
   -> 使用*分片阅读器(SplitReader)API* 实现源连接器时
   
   与上文的翻译保持一致

##
File path: docs/content.zh/docs/dev/datastream/sources.md
##
@@ -28,111 +28,118 @@ under the License.
 
 
 {{< hint warning >}}
-**Note**: This describes the new Data Source API, introduced in Flink 1.11 as 
part of 

[jira] [Comment Edited] (FLINK-20945) flink hive insert heap out of memory

2021-04-28 Thread George Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335084#comment-17335084
 ] 

George Song edited comment on FLINK-20945 at 4/29/21, 1:05 AM:
---

[~aswinram92] I think we need to move/duplicate this ticket to parquet-mr 
project


was (Author: yisong):
[~aswinram92] I think we need to move this ticket to parquet-mr project

> flink hive insert heap out of memory
> 
>
> Key: FLINK-20945
> URL: https://issues.apache.org/jira/browse/FLINK-20945
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
> Environment: flink 1.12.0 
> hive-exec 2.3.5
>Reporter: Bruce GAO
>Priority: Major
>  Labels: stale-major
>
> when using flink sql to insert into hive from kafka, heap out of memory 
> occrus randomly.
> Hive table using year/month/day/hour as partition,  it seems the max heap 
> space needed is corresponded to active partition number(according to kafka 
> message disordered and delay). which means if partition number increases, the 
> heap space needed also increase, may cause the heap out of memory.
> when write record, is it possible to take the whole heap space usage into 
> account in checkBlockSizeReached, or some other method to avoid OOM?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20945) flink hive insert heap out of memory

2021-04-28 Thread George Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335084#comment-17335084
 ] 

George Song commented on FLINK-20945:
-

[~aswinram92] I think we need to move this ticket to parquet-mr project

> flink hive insert heap out of memory
> 
>
> Key: FLINK-20945
> URL: https://issues.apache.org/jira/browse/FLINK-20945
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
> Environment: flink 1.12.0 
> hive-exec 2.3.5
>Reporter: Bruce GAO
>Priority: Major
>  Labels: stale-major
>
> when using flink sql to insert into hive from kafka, heap out of memory 
> occrus randomly.
> Hive table using year/month/day/hour as partition,  it seems the max heap 
> space needed is corresponded to active partition number(according to kafka 
> message disordered and delay). which means if partition number increases, the 
> heap space needed also increase, may cause the heap out of memory.
> when write record, is it possible to take the whole heap space usage into 
> account in checkBlockSizeReached, or some other method to avoid OOM?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] 95chenjz commented on pull request #15767: [FLINK-22393][docs-zh] Translate the page of "Execution Mode (Batch/Streaming)" into Chinese

2021-04-28 Thread GitBox


95chenjz commented on pull request #15767:
URL: https://github.com/apache/flink/pull/15767#issuecomment-828877478


   cc @wuchong @xccui @klion26  @PatrickRen @becketqin. Could you review my PR 
when you're free? No hurry, looking for your  feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-11681) Add an AbstractMetric to combine the metric definition and metric management.

2021-04-28 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-11681.

Resolution: Abandoned

After some discussion, we have not reached consensus on whether this is 
necessary to do. Close the ticket at this point. We can revisit the metric 
abstraction if we found some issue.

> Add an AbstractMetric to combine the metric definition and metric management.
> -
>
> Key: FLINK-11681
> URL: https://issues.apache.org/jira/browse/FLINK-11681
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the metric definitions are spread over many different components. 
> It would be useful to have a class that combines the metric definition and 
> management. This ticket is to create such an abstraction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20833) Expose pluggable interface for exception analysis and metrics reporting in Execution Graph

2021-04-28 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335080#comment-17335080
 ] 

Zhenqiu Huang commented on FLINK-20833:
---

[~rmetzger] [~xintongsong]
Would you please help to review this PR?

> Expose pluggable interface for  exception analysis and metrics reporting in 
> Execution Graph
> ---
>
> Key: FLINK-20833
> URL: https://issues.apache.org/jira/browse/FLINK-20833
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.0
>Reporter: Zhenqiu Huang
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
>
> For platform users of Apache flink, people usually want to classify the 
> failure reason( for example user code, networking, dependencies and etc) for 
> Flink jobs and emit metrics for those analyzed results. So that platform can 
> provide an accurate value for system reliability by distinguishing the 
> failure due to user logic from the system issues. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10806) Support multiple consuming offsets when discovering a new topic

2021-04-28 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335079#comment-17335079
 ] 

Jiangjie Qin commented on FLINK-10806:
--

After FLIP-27, the DataStream source allows users to specify arbitrary offsets 
to start reading from with a custom {{OffsetsInitializer}}. We may need to 
expose that option in the Table Source as well. Ping [~jark]

> Support multiple consuming offsets when discovering a new topic
> ---
>
> Key: FLINK-10806
> URL: https://issues.apache.org/jira/browse/FLINK-10806
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.6.2, 1.8.1
>Reporter: Jiayi Liao
>Priority: Major
>  Labels: auto-unassigned
>
> In KafkaConsumerBase, we discover the TopicPartitions and compare them with 
> the restoredState. It's reasonable when a topic's partitions scaled. However, 
> if we add a new topic which has too much data and restore the Flink program, 
> the data of the new topic will be consumed from the start, which may not be 
> what we want.  I think this should be an option for developers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16634) The PartitionDiscoverer in FlinkKafkaConsumer should not use the user provided client.id.

2021-04-28 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-16634:


Assignee: Qingsheng Ren

> The PartitionDiscoverer in FlinkKafkaConsumer should not use the user 
> provided client.id.
> -
>
> Key: FLINK-16634
> URL: https://issues.apache.org/jira/browse/FLINK-16634
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jiangjie Qin
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: auto-unassigned
>
> The {{PartitionDiscoverer}} creates a {{KafkaConsumer}} using the client.id 
> from the user provided properties. This may cause the MBean to collide with 
> the fetching {{KafkaConsumer}}. The {{PartitionDiscoverer}} should use a 
> unique client.id instead, such as "PartitionDiscoverer-RANDOM_LONG"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-20108) SingleThreadFetcherManager may add splits to a shutting down SplitFetcher

2021-04-28 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-20108.

Fix Version/s: (was: 1.11.4)
   1.11.3
   Resolution: Fixed

FLINK-18128 has been backported to 1.11.3.

e72e48533902fe6a7271310736584e77b64d05b8

> SingleThreadFetcherManager may add splits to a shutting down SplitFetcher
> -
>
> Key: FLINK-20108
> URL: https://issues.apache.org/jira/browse/FLINK-20108
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.11.2
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: auto-unassigned
> Fix For: 1.11.3
>
>
> Currently the split fetchers are only removed from the 
> {{SplitFetcherManager.fetchers}} when the thread exit. The may cause problem 
> because when {{SplitFetcherManager.addSplits()}} is called, it may see a 
> shutting down split fetcher and adds splits to it. These splits will then 
> just be lost.
> This issue is actually already fixed in FLINK-18128. The fix needs to 
> cherry-picked to 1.11.3



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22516) ResourceManager cannot establish leadership

2021-04-28 Thread Ricky Burnett (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ricky Burnett updated FLINK-22516:
--
Affects Version/s: 1.9.3

> ResourceManager cannot establish leadership
> ---
>
> Key: FLINK-22516
> URL: https://issues.apache.org/jira/browse/FLINK-22516
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.9.3
> Environment: 1.9.3 Flink version, on kubernetes.
>Reporter: Ricky Burnett
>Priority: Major
> Attachments: jobmanager_leadership.log
>
>
> We are running Flink clusters with 2 Jobmanagers in HA mode.  After a 
> Zookeeper restart the two JMs begin leadership election end up in state where 
> they are both trying to start their ResourceManager and until one of them 
> writes to `leader//resource_manager_lock` and the other Jobmanager's 
> JobMaster proceeds to execute `notifyOfNewResourceManagerLeader` which 
> restarts the ResourceManager.  This in turn writes to 
> `leader//resource_manager_lock` which triggers the first JobMaster to 
> restart it's ResourceManager.  We can see this in the logs from the 
> "ResourceManager leader changed to new address" log, that goes back and forth 
> between the two JMs and the two IP addresses.  This cycle appears to continue 
> indefinitely with outside interruption.  
> I've attached combined logs from two JMs in our environment that got into 
> this state.  The logs start with the loss of connection and end with a couple 
> of cycles of back and forth.   The two relevant hosts are 
> "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and 
> "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x".
> *-tsxb7 appears to be the last host that was granted leadership. 
> {code:java}
> {"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager
>  runner for job tenant: ssademo, pipeline: 
> 828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection 
> (33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id 
> 97992805-9c60-40ba-8260-aaf036694cde at 
> akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"}
> {code}
> But  *-mpf9x continues to try to wrestle control back.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22516) ResourceManager cannot establish leadership

2021-04-28 Thread Ricky Burnett (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ricky Burnett updated FLINK-22516:
--
Environment: 1.9.3 Flink version, on kubernetes.

> ResourceManager cannot establish leadership
> ---
>
> Key: FLINK-22516
> URL: https://issues.apache.org/jira/browse/FLINK-22516
> Project: Flink
>  Issue Type: Bug
> Environment: 1.9.3 Flink version, on kubernetes.
>Reporter: Ricky Burnett
>Priority: Major
> Attachments: jobmanager_leadership.log
>
>
> We are running Flink clusters with 2 Jobmanagers in HA mode.  After a 
> Zookeeper restart the two JMs begin leadership election end up in state where 
> they are both trying to start their ResourceManager and until one of them 
> writes to `leader//resource_manager_lock` and the other Jobmanager's 
> JobMaster proceeds to execute `notifyOfNewResourceManagerLeader` which 
> restarts the ResourceManager.  This in turn writes to 
> `leader//resource_manager_lock` which triggers the first JobMaster to 
> restart it's ResourceManager.  We can see this in the logs from the 
> "ResourceManager leader changed to new address" log, that goes back and forth 
> between the two JMs and the two IP addresses.  This cycle appears to continue 
> indefinitely with outside interruption.  
> I've attached combined logs from two JMs in our environment that got into 
> this state.  The logs start with the loss of connection and end with a couple 
> of cycles of back and forth.   The two relevant hosts are 
> "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and 
> "flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x".
> *-tsxb7 appears to be the last host that was granted leadership. 
> {code:java}
> {"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager
>  runner for job tenant: ssademo, pipeline: 
> 828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection 
> (33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id 
> 97992805-9c60-40ba-8260-aaf036694cde at 
> akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"}
> {code}
> But  *-mpf9x continues to try to wrestle control back.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-28 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r622618082



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+/** The underlying blob storage. */
+private final BlobStorage storage;
+
+/** The GS file system options. */
+private final GSFileSystemOptions options;
+
+/** The recoverable writer instance. */
+private final GSRecoverableWriter writer;
+
+/** The recoverable writer state for the commit operation. */
+private final GSRecoverableWriterState state;
+
+GSRecoverableWriterCommitter(
+BlobStorage storage,
+GSFileSystemOptions options,
+GSRecoverableWriter writer,
+GSRecoverableWriterState state) {
+this.storage = Preconditions.checkNotNull(storage);
+this.options = Preconditions.checkNotNull(options);
+this.writer = Preconditions.checkNotNull(writer);
+this.state = Preconditions.checkNotNull(state);
+}
+
+@Override
+public void commit() throws IOException {
+
+// compose all the component blob ids into the final blob id. if the 
component blob ids are
+// in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+// compose to a new temporary blob id in the same bucket as the 
component blob ids and
+// then copy that blob to the final blob location
+if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+// compose directly to final blob
+composeBlobs(
+state.getComponentBlobIds(options),
+state.finalBlobId,
+options.writerContentType);
+
+} else {
+
+// compose to a temporary blob id, then copy to final blob id
+BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+composeBlobs(
+state.getComponentBlobIds(options),
+intermediateBlobId,
+options.writerContentType);
+storage.copy(intermediateBlobId, state.finalBlobId);
+}
+
+// clean up after commit
+writer.cleanupRecoverableState(state);

Review comment:
   One more thought on recovering from earlier checkpoints.
   
   Another way to make this possible would be not to do any cleanup in 
```commit``` or ```cleanupRecoverableState``` and leave the responsibility for 
cleanup of temporary blobs to the caller. The most obvious way for the caller 
to do this would be to specify a separate bucket for temporary blobs and then 
to apply a TTL.
   
   This would make things nice and simple -- temporary files would stay around 
for the TTL period, and it would be the responsibility of the caller to choose 
an appropriate TTL to balance recovery and storage needs. The ability to 
recover wouldn't depend on whether ```commit``` or 
```cleanupRecoverableState``` or any other method had been called at some 
point, it would just depend on the age of the temporary blobs. This actually 
strikes me as a nice configuration, one that I might like to use.
   
   The downside, of course, would be that if this were the default 
configuration, then callers who write to GS files without supplying any 
additional configuration (i.e. no options) would leak temporary blobs. If this 
were not the 

[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15797:
URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728


   
   ## CI report:
   
   * 9208f004adafc3997d91921efe3e7d291eb8360f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17362)
 
   * 32b72459e66decd79269d3718f1ee40c3d765775 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17372)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15797:
URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728


   
   ## CI report:
   
   * 9208f004adafc3997d91921efe3e7d291eb8360f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17362)
 
   * 32b72459e66decd79269d3718f1ee40c3d765775 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22516) ResourceManager cannot establish leadership

2021-04-28 Thread Ricky Burnett (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ricky Burnett updated FLINK-22516:
--
Description: 
We are running Flink clusters with 2 Jobmanagers in HA mode.  After a Zookeeper 
restart the two JMs begin leadership election end up in state where they are 
both trying to start their ResourceManager and until one of them writes to 
`leader//resource_manager_lock` and the other Jobmanager's JobMaster 
proceeds to execute `notifyOfNewResourceManagerLeader` which restarts the 
ResourceManager.  This in turn writes to `leader//resource_manager_lock` 
which triggers the first JobMaster to restart it's ResourceManager.  We can see 
this in the logs from the "ResourceManager leader changed to new address" log, 
that goes back and forth between the two JMs and the two IP addresses.  This 
cycle appears to continue indefinitely with outside interruption.  

I've attached combined logs from two JMs in our environment that got into this 
state.  The logs start with the loss of connection and end with a couple of 
cycles of back and forth.   The two relevant hosts are 
"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and 
"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x".

*-tsxb7 appears to be the last host that was granted leadership. 
{code:java}
{"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager
 runner for job tenant: ssademo, pipeline: 
828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection 
(33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id 
97992805-9c60-40ba-8260-aaf036694cde at 
akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"}
{code}
But  *-mpf9x continues to try to wrestle control back.

  was:
We are running Flink clusters with 2 Jobmanagers in HA mode.  After a Zookeeper 
restart the two JMs begin leadership election end up in state where they are 
both trying to start their ResourceManager and until one of them writes to 
`leader//resource_manager_lock` and the JobMaster proceeds to execute 
`notifyOfNewResourceManagerLeader` which restarts the ResourceManager.  This in 
turn writes to `leader//resource_manager_lock` which triggers the other 
JobMaster to restart it's ResourceManager.  We can see this in the logs from 
the "ResourceManager leader changed to new address" log, that goes back and 
forth between the two JMs and the two IP addresses.  This cycle appears to 
continue indefinitely with outside interruption.  

I've attached combined logs from two JMs in our environment that got into this 
state.  The logs start with the loss of connection and end with a couple of 
cycles of back and forth.   The two relevant hosts are 
"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and 
"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x".

*-tsxb7 appears to be the last host that was granted leadership. 
{code:java}
{"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager
 runner for job tenant: ssademo, pipeline: 
828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection 
(33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id 
97992805-9c60-40ba-8260-aaf036694cde at 
akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"}
{code}
But  *-mpf9x continues to try to wrestle control back.


> ResourceManager cannot establish leadership
> ---
>
> Key: FLINK-22516
> URL: https://issues.apache.org/jira/browse/FLINK-22516
> Project: Flink
>  Issue Type: Bug
>Reporter: Ricky Burnett
>Priority: Major
> Attachments: jobmanager_leadership.log
>
>
> We are running Flink clusters with 2 Jobmanagers in HA mode.  After a 
> Zookeeper restart the two JMs begin leadership 

[jira] [Created] (FLINK-22516) ResourceManager cannot establish leadership

2021-04-28 Thread Ricky Burnett (Jira)
Ricky Burnett created FLINK-22516:
-

 Summary: ResourceManager cannot establish leadership
 Key: FLINK-22516
 URL: https://issues.apache.org/jira/browse/FLINK-22516
 Project: Flink
  Issue Type: Bug
Reporter: Ricky Burnett
 Attachments: jobmanager_leadership.log

We are running Flink clusters with 2 Jobmanagers in HA mode.  After a Zookeeper 
restart the two JMs begin leadership election end up in state where they are 
both trying to start their ResourceManager and until one of them writes to 
`leader//resource_manager_lock` and the JobMaster proceeds to execute 
`notifyOfNewResourceManagerLeader` which restarts the ResourceManager.  This in 
turn writes to `leader//resource_manager_lock` which triggers the other 
JobMaster to restart it's ResourceManager.  We can see this in the logs from 
the "ResourceManager leader changed to new address" log, that goes back and 
forth between the two JMs and the two IP addresses.  This cycle appears to 
continue indefinitely with outside interruption.  

I've attached combined logs from two JMs in our environment that got into this 
state.  The logs start with the loss of connection and end with a couple of 
cycles of back and forth.   The two relevant hosts are 
"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7" and 
"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-mpf9x".

*-tsxb7 appears to be the last host that was granted leadership. 
{code:java}
{"thread":"Curator-Framework-0-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager
 runner for job tenant: ssademo, pipeline: 
828d4aa2-d4d4-457b-995d-feb56d08c1fb, name: integration-test-detection 
(33e12948df69077ab3b33316eacbb5e4) was granted leadership with session id 
97992805-9c60-40ba-8260-aaf036694cde at 
akka.tcp://flink@100.97.92.73:6123/user/jobmanager_3.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","instant":{"epochSecond":1617129712,"nanoOfSecond":44700},"contextMap":{},"threadId":152,"threadPriority":5,"source":{"class":"org.apache.flink.runtime.jobmaster.JobManagerRunner","method":"startJobMaster","file":"JobManagerRunner.java","line":313},"service":"streams","time":"2021-03-30T18:41:52.447UTC","hostname":"flink-jm-828d4aa2-d4d4-457b-995d-feb56d08c1fb-784cdb9c57-tsxb7"}
{code}
But  *-mpf9x continues to try to wrestle control back.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15797:
URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728


   
   ## CI report:
   
   * 9208f004adafc3997d91921efe3e7d291eb8360f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17362)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15798:
URL: https://github.com/apache/flink/pull/15798#issuecomment-828566333


   
   ## CI report:
   
   * d02069bb5bff8be1d10a04cc981caa01fb3919ab Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17364)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15631:
URL: https://github.com/apache/flink/pull/15631#issuecomment-820458718


   
   ## CI report:
   
   * 8db656d8d783b61761d3de7727c0e8831c5e77cd Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17361)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15479: [FLINK-19606][table-runtime-blink] Introduce StreamExecWindowJoin and window join it cases

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15479:
URL: https://github.com/apache/flink/pull/15479#issuecomment-812344008


   
   ## CI report:
   
   * 1b562e2eec41f48d9ac9b4e7591240b6c8d99e61 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17360)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22515) Add Documentation for Flink Glue Schema Registry Integration

2021-04-28 Thread Linyu Yao (Jira)
Linyu Yao created FLINK-22515:
-

 Summary: Add Documentation for Flink Glue Schema Registry 
Integration
 Key: FLINK-22515
 URL: https://issues.apache.org/jira/browse/FLINK-22515
 Project: Flink
  Issue Type: New Feature
  Components: Documentation
Reporter: Linyu Yao


Add documentation for Flink Glue Schema Registry integration to the page of 
Kafka Connector and Kinesis Connector:

[https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#apache-kafka-connector]

[https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kinesis.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-web] XComp commented on a change in pull request #436: Add Apache Flink release 1.13.0

2021-04-28 Thread GitBox


XComp commented on a change in pull request #436:
URL: https://github.com/apache/flink-web/pull/436#discussion_r622363151



##
File path: _posts/2021-04-22-release-1.13.0.md
##
@@ -0,0 +1,483 @@
+---
+layout: post 
+title:  "Apache Flink 1.13.0 Release Announcement"
+date: 2021-04-22T08:00:00.000Z 
+categories: news 
+authors:
+- stephan:
+  name: "Stephan Ewen"
+  twitter: "StephanEwen"
+- dwysakowicz:
+  name: "Dawid Wysakowicz"
+  twitter: "dwysakowicz"
+
+excerpt: The Apache Flink community is excited to announce the release of 
Flink 1.13.0! Close to xxx contributors worked on over xxx threads to bring 
significant improvements to usability and observability as well as new features 
that improve elasticity of Flink’s Application-style deployments.
+---
+
+
+The Apache Flink community is excited to announce the release of Flink 1.13.0! 
More than 200
+contributors worked on over 1k threads to bring significant improvements to 
usability and
+observability as well as new features that improve elasticity of Flink’s 
Application-style
+deployments.
+
+This release brings us a big step forward in one of our major efforts: Making 
Stream Processing
+Applications as natural and as simple to manage as any other application. The 
new reactive scaling
+mode means that scaling streaming applications in and out now works like in 
any other application,
+by just changing the number of parallel processes.
+
+We also added a series of improvements that help users better understand the 
performance of
+applications. When the streams don't flow as fast as you’d hope, these can 
help you to understand
+why: Load and backpressure visualization to identify bottlenecks, CPU flame 
graphs to identify hot
+code paths in your application, and State Access Latencies to see how the 
State Backends are keeping
+up.
+
+This blog post describes all major new features and improvements, important 
changes to be aware of
+and what to expect moving forward.
+
+{% toc %}
+
+We encourage you to [download the 
release](https://flink.apache.org/downloads.html) and share your
+feedback with the community through
+the [Flink mailing 
lists](https://flink.apache.org/community.html#mailing-lists)
+or [JIRA](https://issues.apache.org/jira/projects/FLINK/summary).
+
+## Notable Features and Improvements
+
+### Reactive mode
+
+The Reactive Mode is the latest piece in Flink's initiative for making Stream 
Processing
+Applications as natural and as simple to manage as any other application.
+
+Flink has a dual nature when it comes to resource management and deployments: 
You can deploy
+clusters onto Resource Managers like Kubernetes or Yarn in such a way that 
Flink actively manages
+the resource, and allocates and releases workers as needed. That is especially 
useful for jobs and
+applications that rapidly change their required resources, like batch 
applications and ad-hoc SQL
+queries. The application parallelism rules, the number of workers follows. We 
call this active
+scaling.
+
+For long running streaming applications, it is often a nicer model to just 
deploy them like any
+other long-running application: The application doesn't really need to know 
that it runs on K8s,
+EKS, Yarn, etc. and doesn't try to acquire a specific amount of workers; 
instead, it just uses the
+number of workers that is given to it. The number of workers rules, the 
application parallelism
+adjusts to that. We call that re-active scaling.
+
+The Application Deployment Mode started this effort, making deployments 
application-like (avoiding
+having to separate deployment steps to (1) start cluster and (2) submit 
application). The reactive
+scheduler completes this, and you now don't have to use extra tools (scripts 
or a K8s operator) any
+more to keep the number of workers and the application parallelism settings in 
sync.
+
+You can now put an auto-scaler around Flink applications like around other 
typical applications — as
+long as you are mindful when configuring the autoscaler that stateful 
applications still spend
+effort in moving state around when scaling.
+
+
+### Bottleneck detection, Backpressure and Idleness Monitoring
+
+One of the most important metrics to investigate when a job does not consume 
records as fast as you
+would expect is the backpressure ratio. It lets you track down bottlenecks in 
your pipelines. The
+current mechanism had two limitations:
+It was heavy, because it worked by repeatedly taking stack trace samples of 
your running tasks. It
+was difficult to find out which vertex was the source of backpressure. In 
Flink 1.13, we reworked
+the mechanism to include new metrics for the time tasks spend being 
backpressured, along with a
+reworked graphical representation of the job (including a percentage of time 
particular vertices are
+backpressured).
+
+
+
+  
+
+
+### Support for CPU flame graphs in Web UI
+
+It is desirable to provide better visibility into the distribution of CPU 
resources while 

[jira] [Commented] (FLINK-22513) Failed to upload compile artifacts

2021-04-28 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334913#comment-17334913
 ] 

Robert Metzger commented on FLINK-22513:


Let's hope this is just some glitch in the infrastructure that's not permanent

> Failed to upload compile artifacts
> --
>
> Key: FLINK-22513
> URL: https://issues.apache.org/jira/browse/FLINK-22513
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17343=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=5e2d176c-a6d4-5b54-7808-b11714e363ad



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15479: [FLINK-19606][table-runtime-blink] Introduce StreamExecWindowJoin and window join it cases

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15479:
URL: https://github.com/apache/flink/pull/15479#issuecomment-812344008


   
   ## CI report:
   
   * 1b562e2eec41f48d9ac9b4e7591240b6c8d99e61 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17360)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15796: [FLINK-22511][python] Fix the bug of non-composite result type in Python TableAggregateFunction

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15796:
URL: https://github.com/apache/flink/pull/15796#issuecomment-828441475


   
   ## CI report:
   
   * 4c3e94df03863bf43eac7b47be51764f2f4be659 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17352)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-11158) Do not show empty page for unknown jobs

2021-04-28 Thread Nico Kruber (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-11158.
---
Resolution: Fixed

Actually, I think, this has been solved by now since the Web UI is presenting a 
"job not found" popup when you try to access a non-existing job.

> Do not show empty page for unknown jobs
> ---
>
> Key: FLINK-11158
> URL: https://issues.apache.org/jira/browse/FLINK-11158
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: auto-unassigned
>
> If you try to access the Web UI using an old/non-existing job ID, e.g. after 
> a cluster restart, you are currently presented with a white page with no 
> further info. It should at least contain "job not found" message to indicate 
> that there was no other error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15121:
URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240


   
   ## CI report:
   
   * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN
   * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN
   * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN
   * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN
   * b7dca63a2e98f09c8c780a21091b5268d897b220 UNKNOWN
   * 2b7bb05a4dbbb0883db566589dfe1975fdbb279d UNKNOWN
   * 00d3356d86576e2a129405a92f9ebb2acb5d6412 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17358)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-28 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r622377812



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+/** The underlying blob storage. */
+private final BlobStorage storage;
+
+/** The GS file system options. */
+private final GSFileSystemOptions options;
+
+/** The recoverable writer instance. */
+private final GSRecoverableWriter writer;
+
+/** The recoverable writer state for the commit operation. */
+private final GSRecoverableWriterState state;
+
+GSRecoverableWriterCommitter(
+BlobStorage storage,
+GSFileSystemOptions options,
+GSRecoverableWriter writer,
+GSRecoverableWriterState state) {
+this.storage = Preconditions.checkNotNull(storage);
+this.options = Preconditions.checkNotNull(options);
+this.writer = Preconditions.checkNotNull(writer);
+this.state = Preconditions.checkNotNull(state);
+}
+
+@Override
+public void commit() throws IOException {
+
+// compose all the component blob ids into the final blob id. if the 
component blob ids are
+// in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+// compose to a new temporary blob id in the same bucket as the 
component blob ids and
+// then copy that blob to the final blob location
+if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+// compose directly to final blob
+composeBlobs(
+state.getComponentBlobIds(options),
+state.finalBlobId,
+options.writerContentType);
+
+} else {
+
+// compose to a temporary blob id, then copy to final blob id
+BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+composeBlobs(
+state.getComponentBlobIds(options),
+intermediateBlobId,
+options.writerContentType);
+storage.copy(intermediateBlobId, state.finalBlobId);
+}
+
+// clean up after commit
+writer.cleanupRecoverableState(state);

Review comment:
   Yes, agreed. I've set up a few test cases locally and watched how the 
various methods (```cleanupRecoverableState```, ```commit```, etc.) get called 
using both a 
[DefaultRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)
 and a 
[CheckpointRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.html)
 with a 
[StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html).
 I think I see what's going on here -- I'll share my findings with you and 
please let me know what you think.
   
   First, a class of interest is the 
[Bucket](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java)
 class, which is written to by a streaming file sink and is responsible for 
interacting with the underlying storage, via a 

[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-28 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r622380775



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+/** The blob id to which the recoverable write operation is writing. */
+public final BlobId finalBlobId;
+
+/** The number of bytes that have been written so far. */
+public long bytesWritten;
+
+/** Indicates if the write has been closed. */
+public boolean closed;
+
+/** The object ids for the temporary objects that should be composed to 
form the final blob. */
+public final List componentObjectIds;
+
+GSRecoverableWriterState(
+BlobId finalBlobId, long bytesWritten, boolean closed, List 
componentObjectIds) {
+this.finalBlobId = Preconditions.checkNotNull(finalBlobId);
+Preconditions.checkArgument(bytesWritten >= 0);
+this.bytesWritten = bytesWritten;
+this.closed = closed;
+
+// shallow copy the component object ids to ensure this state object 
exclusively
+// manages the list of component object ids
+this.componentObjectIds = new 
ArrayList<>(Preconditions.checkNotNull(componentObjectIds));
+}
+
+GSRecoverableWriterState(GSRecoverableWriterState state) {
+this(state.finalBlobId, state.bytesWritten, state.closed, 
state.componentObjectIds);
+}
+
+GSRecoverableWriterState(BlobId finalBlobId) {
+this(finalBlobId, 0, false, new ArrayList<>());
+}
+
+/**
+ * Returns the temporary bucket name. If options specifies a temporary 
bucket name, we use that
+ * one; otherwise, we use the bucket name of the final blob.
+ *
+ * @param options The GS file system options
+ * @return The temporary bucket name
+ */
+String getTemporaryBucketName(GSFileSystemOptions options) {
+return options.writerTemporaryBucketName.isEmpty()
+? finalBlobId.getBucket()
+: options.writerTemporaryBucketName;
+}

Review comment:
   Yes -- agreed, will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-28 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r622377812



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+/** The underlying blob storage. */
+private final BlobStorage storage;
+
+/** The GS file system options. */
+private final GSFileSystemOptions options;
+
+/** The recoverable writer instance. */
+private final GSRecoverableWriter writer;
+
+/** The recoverable writer state for the commit operation. */
+private final GSRecoverableWriterState state;
+
+GSRecoverableWriterCommitter(
+BlobStorage storage,
+GSFileSystemOptions options,
+GSRecoverableWriter writer,
+GSRecoverableWriterState state) {
+this.storage = Preconditions.checkNotNull(storage);
+this.options = Preconditions.checkNotNull(options);
+this.writer = Preconditions.checkNotNull(writer);
+this.state = Preconditions.checkNotNull(state);
+}
+
+@Override
+public void commit() throws IOException {
+
+// compose all the component blob ids into the final blob id. if the 
component blob ids are
+// in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+// compose to a new temporary blob id in the same bucket as the 
component blob ids and
+// then copy that blob to the final blob location
+if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+// compose directly to final blob
+composeBlobs(
+state.getComponentBlobIds(options),
+state.finalBlobId,
+options.writerContentType);
+
+} else {
+
+// compose to a temporary blob id, then copy to final blob id
+BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+composeBlobs(
+state.getComponentBlobIds(options),
+intermediateBlobId,
+options.writerContentType);
+storage.copy(intermediateBlobId, state.finalBlobId);
+}
+
+// clean up after commit
+writer.cleanupRecoverableState(state);

Review comment:
   Yes, agreed. I've set up a few test cases locally and watched how the 
various methods (```cleanupRecoverableState```, ```commit```, etc.) get called 
using both a 
[DefaultRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)
 and a 
[CheckpointRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.html)
 with a 
[StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html).
 I think I see what's going on here -- I'll share my findings with you and 
please let me know what you think.
   
   First, a class of interest is the 
[Bucket](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java)
 class, which is written to by a streaming file sink and is responsible for 
interacting with the underlying storage, via a 

[GitHub] [flink] galenwarren commented on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-28 Thread GitBox


galenwarren commented on pull request #15599:
URL: https://github.com/apache/flink/pull/15599#issuecomment-828625909


   @rmetzger Thanks for your help diagnosing the build issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] galenwarren commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-28 Thread GitBox


galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r622377812



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+/** The underlying blob storage. */
+private final BlobStorage storage;
+
+/** The GS file system options. */
+private final GSFileSystemOptions options;
+
+/** The recoverable writer instance. */
+private final GSRecoverableWriter writer;
+
+/** The recoverable writer state for the commit operation. */
+private final GSRecoverableWriterState state;
+
+GSRecoverableWriterCommitter(
+BlobStorage storage,
+GSFileSystemOptions options,
+GSRecoverableWriter writer,
+GSRecoverableWriterState state) {
+this.storage = Preconditions.checkNotNull(storage);
+this.options = Preconditions.checkNotNull(options);
+this.writer = Preconditions.checkNotNull(writer);
+this.state = Preconditions.checkNotNull(state);
+}
+
+@Override
+public void commit() throws IOException {
+
+// compose all the component blob ids into the final blob id. if the 
component blob ids are
+// in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+// compose to a new temporary blob id in the same bucket as the 
component blob ids and
+// then copy that blob to the final blob location
+if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+// compose directly to final blob
+composeBlobs(
+state.getComponentBlobIds(options),
+state.finalBlobId,
+options.writerContentType);
+
+} else {
+
+// compose to a temporary blob id, then copy to final blob id
+BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+composeBlobs(
+state.getComponentBlobIds(options),
+intermediateBlobId,
+options.writerContentType);
+storage.copy(intermediateBlobId, state.finalBlobId);
+}
+
+// clean up after commit
+writer.cleanupRecoverableState(state);

Review comment:
   Yes, agreed. I've set up a few test cases locally and watched how the 
various methods (```cleanupRecoverableState```, ```commit```, etc.) get called 
using both a 
[DefaultRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)
 and a 
[CheckpointRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.html)
 with a 
[StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html).
 I think I see what's going on here -- I'll share my findings with you and 
please let me know what you think.
   
   First, a class of interest is the 
[Bucket](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java)
 class, which is written to by a streaming file sink and is responsible for 
interacting with the underlying storage, via a 

[GitHub] [flink] todd5167 commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…

2021-04-28 Thread GitBox


todd5167 commented on a change in pull request #15755:
URL: https://github.com/apache/flink/pull/15755#discussion_r622333275



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable catalogTable) {
+
+Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+Schema.Builder builder = Schema.newBuilder();
+// build column
+modifiedTableSchema.getColumns().stream()
+.forEach(
+column -> {
+if (StringUtils.equals(column.getName(), 
originColumnName)) {
+buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+} else {
+buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+}
+});
+// build primary key column
+List originPrimaryKeyNames =
+modifiedTableSchema
+.getPrimaryKey()
+.map(Schema.UnresolvedPrimaryKey::getColumnNames)
+.orElseGet(Lists::newArrayList);
+
+List newPrimaryKeyNames =
+originPrimaryKeyNames.stream()
+.map(
+pkName ->
+StringUtils.equals(pkName, 
originColumnName)
+? newColumnName
+: pkName)
+.collect(Collectors.toList());
+
+if (newPrimaryKeyNames.size() > 0) {
+builder.primaryKey(newPrimaryKeyNames);
+}
+// build watermark
+modifiedTableSchema.getWatermarkSpecs().stream()
+.forEach(
+watermarkSpec -> {
+String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+String newWatermarkExpression =
+((SqlCallExpression) 
watermarkExpression)
+.getSqlExpression()
+
.replace(watermarkRefColumnName, newColumnName);

Review comment:
   hi, @wuchong.
  If the calculated column expression contains a renamed column, it will 
throw 'Cannot resolve field [oldName], input field list:[xx, newName]' when 
resolving the schema.
   
 You said 'traverse the node tree to check whether it contains the renamed 
column', I don’t know where to add it. If use ResolverRule, how to distinguish 
it from ReferenceResolverRule.
   
  thanks .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] todd5167 commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…

2021-04-28 Thread GitBox


todd5167 commented on a change in pull request #15755:
URL: https://github.com/apache/flink/pull/15755#discussion_r622333275



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable catalogTable) {
+
+Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+Schema.Builder builder = Schema.newBuilder();
+// build column
+modifiedTableSchema.getColumns().stream()
+.forEach(
+column -> {
+if (StringUtils.equals(column.getName(), 
originColumnName)) {
+buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+} else {
+buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+}
+});
+// build primary key column
+List originPrimaryKeyNames =
+modifiedTableSchema
+.getPrimaryKey()
+.map(Schema.UnresolvedPrimaryKey::getColumnNames)
+.orElseGet(Lists::newArrayList);
+
+List newPrimaryKeyNames =
+originPrimaryKeyNames.stream()
+.map(
+pkName ->
+StringUtils.equals(pkName, 
originColumnName)
+? newColumnName
+: pkName)
+.collect(Collectors.toList());
+
+if (newPrimaryKeyNames.size() > 0) {
+builder.primaryKey(newPrimaryKeyNames);
+}
+// build watermark
+modifiedTableSchema.getWatermarkSpecs().stream()
+.forEach(
+watermarkSpec -> {
+String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+String newWatermarkExpression =
+((SqlCallExpression) 
watermarkExpression)
+.getSqlExpression()
+
.replace(watermarkRefColumnName, newColumnName);

Review comment:
   hi, @wuchong.
  If the calculated column expression contains a renamed column, it will 
throw 'Cannot resolve field [oldName], input field list:[xx, newName]' when 
resolving the schema.
 You said 'traverse the node tree to check whether it contains the renamed 
column', I don’t know where to add it. If use ResolverRule, how to distinguish 
it from ReferenceResolverRule.
  thanks .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15798:
URL: https://github.com/apache/flink/pull/15798#issuecomment-828566333


   
   ## CI report:
   
   * d02069bb5bff8be1d10a04cc981caa01fb3919ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17364)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22514) TypeExtractor - Improving log message

2021-04-28 Thread Avishay Balderman (Jira)
Avishay Balderman created FLINK-22514:
-

 Summary: TypeExtractor - Improving log message 
 Key: FLINK-22514
 URL: https://issues.apache.org/jira/browse/FLINK-22514
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Avishay Balderman


org.apache.flink.api.java.typeutils.TypeExtractor is checking if a field in a 
class is a "valid POJO field" .

The method that is responsible for this is:  
{code:java}
isValidPojoField{code}
When isValidPojoField find an invalid field a log message is written (see 
below) but the log message does not tell which field is invalid...

So now the developer needs to find out the "bad" field.

Adding the field info to the log message is easy and save the developer time.

 

 
{code:java}
for (Field field : fields) {
   Type fieldType = field.getGenericType();
   if(!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) {
  LOG.info("Class " + clazz + " cannot be used as a POJO type because not 
all fields are valid POJO fields, " +
 "and must be processed as GenericType. Please read the Flink 
documentation " +
 "on \"Data Types & Serialization\" for details of the effect on 
performance.");
  return null;
   }
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22514) TypeExtractor - Improving log message

2021-04-28 Thread Avishay Balderman (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Avishay Balderman updated FLINK-22514:
--
Priority: Minor  (was: Major)

> TypeExtractor - Improving log message 
> --
>
> Key: FLINK-22514
> URL: https://issues.apache.org/jira/browse/FLINK-22514
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Avishay Balderman
>Priority: Minor
>
> org.apache.flink.api.java.typeutils.TypeExtractor is checking if a field in a 
> class is a "valid POJO field" .
> The method that is responsible for this is:  
> {code:java}
> isValidPojoField{code}
> When isValidPojoField find an invalid field a log message is written (see 
> below) but the log message does not tell which field is invalid...
> So now the developer needs to find out the "bad" field.
> Adding the field info to the log message is easy and save the developer time.
>  
>  
> {code:java}
> for (Field field : fields) {
>Type fieldType = field.getGenericType();
>if(!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) {
>   LOG.info("Class " + clazz + " cannot be used as a POJO type because not 
> all fields are valid POJO fields, " +
>  "and must be processed as GenericType. Please read the Flink 
> documentation " +
>  "on \"Data Types & Serialization\" for details of the effect on 
> performance.");
>   return null;
>}
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions

2021-04-28 Thread GitBox


flinkbot commented on pull request #15798:
URL: https://github.com/apache/flink/pull/15798#issuecomment-828566333


   
   ## CI report:
   
   * d02069bb5bff8be1d10a04cc981caa01fb3919ab UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22438) add numRecordsOut metric for Async IO

2021-04-28 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-22438:
---
Fix Version/s: (was: 1.12.3)
   1.12.4

> add numRecordsOut metric for Async IO
> -
>
> Key: FLINK-22438
> URL: https://issues.apache.org/jira/browse/FLINK-22438
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Task
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Zhengqi Zhang
>Assignee: Zhengqi Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.1, 1.12.4
>
> Attachments: QQ截图20210424004201.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> In Flink WebUI,there is no numRecordsOut metric,and the class 
> AsyncWaitOperator did't have this metric in fact.Other operators have this 
> metric, which makes it difficult to monitor Async IO operator and can cause 
> confusion for users.
> I think we can directly  use the wrapping output class CountingOutput to 
> update numRecordsOut metric.CountingOutput is used in super class of 
> AsyncWaitOperator(AbstractStreamOperator).
> Here is my commit,And I have run a test, it work.
> [my  
> commit|https://github.com/onyourhead/flink/commit/58a8ac27b292280696639caa2e311637cd631a00]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22479) [Kinesis][Consumer] Potential lock-up under error condition

2021-04-28 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-22479:
---
Fix Version/s: (was: 1.12.3)
   1.12.4

> [Kinesis][Consumer] Potential lock-up under error condition
> ---
>
> Key: FLINK-22479
> URL: https://issues.apache.org/jira/browse/FLINK-22479
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.14.0, 1.12.4
>
>
> *Background*
> This connector has been 
> [forked|https://github.com/awslabs/amazon-kinesis-connector-flink] by AWS for 
> use on KDA with Flink 1.11. Bugs have been encountered:
> - Under high backpressure scenarios
> - When an error is thrown during tear down
> *Scope*
> Pull in the following fixes from AWS fork:
> * Fix issue where {{KinesisDataFetcher.shutdownFetcher()}} hangs 
> ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/23], 
> [pull 
> request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/24])
>   
> * Log error when shutting down Kinesis Data Fetcher 
> ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/22], 
> [pull 
> request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/25])
>   
> * Treating TimeoutException as Recoverable Exception 
> ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28], 
> [pull 
> request|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/21])
>   
> * Add time-out for acquiring subscription and passing events from network to 
> source thread to prevent deadlock ([pull 
> request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/18]) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22434) Dispatcher does not store suspended jobs in execution graph store

2021-04-28 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-22434:
---
Fix Version/s: (was: 1.12.3)
   1.12.4

> Dispatcher does not store suspended jobs in execution graph store
> -
>
> Key: FLINK-22434
> URL: https://issues.apache.org/jira/browse/FLINK-22434
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fabian Paul
>Assignee: Fabian Paul
>Priority: Major
> Fix For: 1.11.4, 1.14.0, 1.13.1, 1.12.4
>
>
> Only globally terminated jobs are currently stored in the execution graph 
> store after termination. In case the JobManager is shutdown and jobs are 
> still running, these jobs will be suspended which is a non-globally 
> terminated state.
> The problem surfaces when a user tries to access information about the job 
> during termination, leading to a job not found response. By storing all 
> terminated jobs in the execution graph store this should be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-jira-bot] sjwiesman closed pull request #11: [hotfix] Make comment language friendly to contributors

2021-04-28 Thread GitBox


sjwiesman closed pull request #11:
URL: https://github.com/apache/flink-jira-bot/pull/11


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15760: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder.

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15760:
URL: https://github.com/apache/flink/pull/15760#issuecomment-826500326


   
   ## CI report:
   
   * e7a99f5f45e9acb529b903d7823735f3158756fc Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17348)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15797:
URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728


   
   ## CI report:
   
   * 9208f004adafc3997d91921efe3e7d291eb8360f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17362)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15631:
URL: https://github.com/apache/flink/pull/15631#issuecomment-820458718


   
   ## CI report:
   
   * 64a6f1c0ccfa13b72911e110c52814d6cf41040a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16621)
 
   * 8db656d8d783b61761d3de7727c0e8831c5e77cd Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17361)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions

2021-04-28 Thread GitBox


flinkbot commented on pull request #15798:
URL: https://github.com/apache/flink/pull/15798#issuecomment-828544554


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit a74e82590ac3ed03415f84c4127cfd9597f13ef4 (Wed Apr 28 
15:21:11 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22513) Failed to upload compile artifacts

2021-04-28 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22513:


 Summary: Failed to upload compile artifacts
 Key: FLINK-22513
 URL: https://issues.apache.org/jira/browse/FLINK-22513
 Project: Flink
  Issue Type: Bug
  Components: Build System / Azure Pipelines
Affects Versions: 1.13.0
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17343=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=5e2d176c-a6d4-5b54-7808-b11714e363ad



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] twalthr opened a new pull request #15798: [FLINK-22168][table] Fix several shortcomings that prevent schema expressions

2021-04-28 Thread GitBox


twalthr opened a new pull request #15798:
URL: https://github.com/apache/flink/pull/15798


   ## What is the purpose of the change
   
   This fixes a couple of critical bugs in the stack that prevented Table API 
expressions to be used for schema declaration. Due to time constraints, this PR 
could not make it into the 1.13 release but should be added to the next bugfix 
release for a smoother user experience. For testing and consistency, it exposes 
the Table API expression `sourceWatermark()` in Java, Scala, and Python API.
   
   ## Brief change log
   
   Updates at various locations that used `TableSchema` before.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows: 
`DataStreamJavaITCase.testFromAndToChangelogStreamEventTime`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-11103) Set a default uncaught exception handler

2021-04-28 Thread Ashwin Kolhatkar (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334781#comment-17334781
 ] 

Ashwin Kolhatkar commented on FLINK-11103:
--

Ok, I can do that. I'll create a class {{UncaughtExceptionHandler}} as the 
utility, and use it in the main methods for all the entrypoints. 

> Set a default uncaught exception handler
> 
>
> Key: FLINK-11103
> URL: https://issues.apache.org/jira/browse/FLINK-11103
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0
>Reporter: Nico Kruber
>Assignee: Ashwin Kolhatkar
>Priority: Major
>  Labels: stale-major, starter, usability
>
> We should set a default uncaught exception handler in Flink via 
> {{Thread.setDefaultUncaughtExceptionHandler()}} which at least logs the 
> exceptions. Ideally, we would even fail the job (could make this 
> configurable) but users may have some ill-behaving threads, e.g. through 
> libraries, which they would want to tolerate and we don't want to change 
> behaviour now.
> (FLINK-5232 added this for the JobManager, we need it for the TaskManager)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2021-04-28 Thread GitBox


flinkbot commented on pull request #15797:
URL: https://github.com/apache/flink/pull/15797#issuecomment-828531728


   
   ## CI report:
   
   * 9208f004adafc3997d91921efe3e7d291eb8360f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dannycranmer merged pull request #15774: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)

2021-04-28 Thread GitBox


dannycranmer merged pull request #15774:
URL: https://github.com/apache/flink/pull/15774


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15631:
URL: https://github.com/apache/flink/pull/15631#issuecomment-820458718


   
   ## CI report:
   
   * 64a6f1c0ccfa13b72911e110c52814d6cf41040a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16621)
 
   * 8db656d8d783b61761d3de7727c0e8831c5e77cd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2021-04-28 Thread GitBox


flinkbot commented on pull request #15797:
URL: https://github.com/apache/flink/pull/15797#issuecomment-828524777


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9208f004adafc3997d91921efe3e7d291eb8360f (Wed Apr 28 
14:57:04 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-22484).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-04-28 Thread Marc Demierre (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334774#comment-17334774
 ] 

Marc Demierre edited comment on FLINK-22452 at 4/28/21, 2:54 PM:
-

I just hit this issue at my company today, due to the mentioned [Prefixed 
ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
 on our shared Kafka cluster. They were enabled on a multi-tenant Kafka to 
avoid clashes between different tenants. I guess it could be a relatively 
common scenario.

What are the next steps on this? Following the discussions at 
https://issues.apache.org/jira/browse/FLINK-11654, is a FLIP being opened? I 
would be available to help if needed.


was (Author: mdemierre):
I just hit this issue at my company today, due to the mentioned [Prefixed 
ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
 on our shared Kafka cluster. They were enabled on a multi-tenant to avoid 
clashes between different tenants. I guess it could be a relatively common 
scenario.

What are the next steps on this? Following the discussions at 
https://issues.apache.org/jira/browse/FLINK-11654, is a FLIP being opened? I 
would be available to help if needed.

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> ---
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Wenhao Ji
>Priority: Major
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22484) Buit-in functions for collections

2021-04-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-22484:
---
Labels: pull-request-available  (was: )

> Buit-in functions for collections
> -
>
> Key: FLINK-22484
> URL: https://issues.apache.org/jira/browse/FLINK-22484
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> There is a number of built-in functions to work with collections are 
> supported by other vendors. After looking at Postgresql, BigQuery, Spark 
> there was selected a list of more or less generic functions for collections 
> (for more details see [1]).
> Feedback for the doc is  welcome
> [1] 
> [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] snuyanzin opened a new pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2021-04-28 Thread GitBox


snuyanzin opened a new pull request #15797:
URL: https://github.com/apache/flink/pull/15797


   
   
   ## What is the purpose of the change
   
   The PR implements three functions from the list mentioned in FLINK-22484 : 
`MAP_KEYS`, `MAP_VALUES`, `MAP_FROM_ARRAYS`
   
   
   ## Brief change log
   
   Implementation, tests and docs for `MAP_KEYS`, `MAP_VALUES`, 
`MAP_FROM_ARRAYS`
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   * `CollectionFunctionsITCase`
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs / JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-04-28 Thread Marc Demierre (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334774#comment-17334774
 ] 

Marc Demierre commented on FLINK-22452:
---

I just hit this issue at my company today, due to the mentioned [Prefixed 
ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
 on our shared Kafka cluster. They were enabled on a multi-tenant to avoid 
clashes between different tenants. I guess it could be a relatively common 
scenario.

What are the next steps on this? Following the discussions at 
https://issues.apache.org/jira/browse/FLINK-11654, is a FLIP being opened? I 
would be available to help if needed.

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> ---
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Wenhao Ji
>Priority: Major
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15712: [FLINK-22400][hive connect]fix NPE problem when convert flink object for Map

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15712:
URL: https://github.com/apache/flink/pull/15712#issuecomment-824189664


   
   ## CI report:
   
   * 8256b986bcfce17787558b17bba6c2c6a9be24c2 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17342)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15479: [FLINK-19606][table-runtime-blink] Introduce StreamExecWindowJoin and window join it cases

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15479:
URL: https://github.com/apache/flink/pull/15479#issuecomment-812344008


   
   ## CI report:
   
   * 3764b47414eba44d4ea2f53b6973fe59f3356cfc Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17359)
 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17045)
 
   * 1b562e2eec41f48d9ac9b4e7591240b6c8d99e61 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17360)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample

2021-04-28 Thread GitBox


flinkbot edited a comment on pull request #15121:
URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240


   
   ## CI report:
   
   * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN
   * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN
   * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN
   * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN
   * b7dca63a2e98f09c8c780a21091b5268d897b220 UNKNOWN
   * 2b7bb05a4dbbb0883db566589dfe1975fdbb279d UNKNOWN
   * 60b3b4f14b78889b827c5e573b1d4ef992791d32 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17324)
 
   * 00d3356d86576e2a129405a92f9ebb2acb5d6412 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17358)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase

2021-04-28 Thread GitBox


pnowojski commented on pull request #15631:
URL: https://github.com/apache/flink/pull/15631#issuecomment-828499112


   I have addressed the feedback. I have also included a couple of extra 
commits from @AHeise, so please pay special attention to those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pnowojski commented on a change in pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase

2021-04-28 Thread GitBox


pnowojski commented on a change in pull request #15631:
URL: https://github.com/apache/flink/pull/15631#discussion_r67839



##
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static 
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
+import static 
org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A stress test that runs for a pre-defined amount of time, verifying data 
correctness and every
+ * couple of checkpoints is triggering fail over to stress test unaligned 
checkpoints.
+ */
+@SuppressWarnings("serial")
+public class UnalignedCheckpointStressITCase extends TestLogger {
+
+protected static final int CHECKPOINT_INTERVAL = 20;
+protected static final int MINIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES 
= 2;
+protected static final int MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES 
= 10;
+protected static final long TEST_DURATION = 
Time.minutes(1).toMilliseconds();
+protected static final int 

[GitHub] [flink] pnowojski commented on a change in pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase

2021-04-28 Thread GitBox


pnowojski commented on a change in pull request #15631:
URL: https://github.com/apache/flink/pull/15631#discussion_r63090



##
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static 
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
+import static 
org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A stress test that runs for a pre-defined amount of time, verifying data 
correctness and every
+ * couple of checkpoints is triggering fail over to stress test unaligned 
checkpoints.
+ */
+@SuppressWarnings("serial")
+public class UnalignedCheckpointStressITCase extends TestLogger {
+
+protected static final int CHECKPOINT_INTERVAL = 20;
+protected static final int MINIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES 
= 2;
+protected static final int MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES 
= 10;
+protected static final long TEST_DURATION = 
Time.minutes(1).toMilliseconds();
+protected static final int 

[GitHub] [flink] pnowojski commented on a change in pull request #15631: [FLINK-20103][tests] Add UnalignedCheckpointsStressITCase

2021-04-28 Thread GitBox


pnowojski commented on a change in pull request #15631:
URL: https://github.com/apache/flink/pull/15631#discussion_r60261



##
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static 
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
+import static 
org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A stress test that runs for a pre-defined amount of time, verifying data 
correctness and every
+ * couple of checkpoints is triggering fail over to stress test unaligned 
checkpoints.
+ */
+@SuppressWarnings("serial")
+public class UnalignedCheckpointStressITCase extends TestLogger {
+
+protected static final int CHECKPOINT_INTERVAL = 20;
+protected static final int MINIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES 
= 2;
+protected static final int MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES 
= 10;
+protected static final long TEST_DURATION = 
Time.minutes(1).toMilliseconds();

Review comment:
   Depending on machine 

  1   2   3   >