[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] gm7y8 commented on pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI

2020-10-21 Thread GitBox


gm7y8 commented on pull request #13458:
URL: https://github.com/apache/flink/pull/13458#issuecomment-714249342


   > > > @XComp I was able to identify the code fix. I working to unit test it 
had some issue with Flink set up to start a job with checkpoint and savepoint. 
it might take a day or so as I am doing it for the first time.
   > > 
   > > 
   > > Hi @gm7y8 , thanks for getting back to us. Let's see if we can finish 
this before the codefreeze next week. Here a few remarks:
   > > 
   > > * The build is failing for me locally (`mvn -pl flink-runtime-web -Dfast 
-DskipTests install`) with the following error message:
   > > 
   > > ```
   > > [ERROR] ERROR in 
src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38):
 : Element implicitly has an 'any' type because type 
'CheckPointDetailInterface' has no index signature.
   > > [ERROR] 
src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38):
 : Property 'checkpoint_type' does not exist on type 
'JobCheckpointsDetailComponent'.
   > > [ERROR] 
src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38):
 : Property 'checkPointConfig' does not exist on type 
'JobCheckpointsDetailComponent'.
   > > [ERROR] 
src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38):
 : Element implicitly has an 'any' type because type 
'CheckPointDetailInterface' has no index signature.
   > > [ERROR] 
src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38):
 : Property 'checkpoint_type' does not exist on type 
'JobCheckpointsDetailComponent'.
   > > [ERROR] 
src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38):
 : Element implicitly has an 'any' type because type 
'CheckPointDetailInterface' has no index signature.
   > > [ERROR] 
src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38):
 : Property 'checkpoint_type' does not exist on type 
'JobCheckpointsDetailComponent'.
   > > ```
   > > 
   > > 
   > > It's always good run a final `mvn install` at least on the modules you 
touched to verify your changes. Alternatively (or rather as an additional 
tool), you can setup Azure CI on your fork. That will run a full test covering 
all modules for each commit on your fork. Check out the [tutorial for setting 
up Azure 
CI](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   > > 
   > > * The changes like 
[job-checkpoints-detail.component.html](https://github.com/apache/flink/pull/13458/commits/0b8e369aa5c8c4e4a8c370b4b848c0f197fcdb9d#diff-a1c50fd814fe8a1c03567f9c71877873251223cfaf32c7067068f7f30d8c5c42R25)
 does not reflect @AHeise's request as you use uppercased tokens here. @AHeise 
suggested normal casing (i.e. lowercase) as it helps to improve the readability.
   > > * Additionally, I realized that you didn't address all points of the PR 
description template. May you fix that?
   > > 
   > > Thanks.
   > 
   > Sure, I fixed the build issue u mentioned, and the build has passed now. 
Also will change the letter to lowercase.
   
   Also updated the description to reflect PR guidelines.. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.

2020-10-21 Thread little-tomato (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218776#comment-17218776
 ] 

little-tomato commented on FLINK-19754:
---

Source: Custom Source -> Map -> Map -> Filter -> Map -> 
SourceConversion(table=[default_catalog.default_database.ruleengine], 
fields=[product_key, productName, device_key, deviceName, alarmControl, 
geoLocation, currentTemperature, airSpeed, workMode, setTemperature, 
powerSwitch, batteryLevel, alarmCondition, operatSwitch, lightVoltage, 
lightCurrent, lightIllumination, powerConsumption, tiltValue, lightStatus, 
originalData]) -> (Calc(select=[productName]) -> SinkConversionToRow -> Sink: 
KafkaTableSink(msg), Calc(select=[deviceName]) -> SinkConversionToRow -> Sink: 
KafkaTableSink(msg))

> Cannot have more than one execute() or executeAsync() call in a single 
> environment.
> ---
>
> Key: FLINK-19754
> URL: https://issues.apache.org/jira/browse/FLINK-19754
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: little-tomato
>Priority: Major
>
> I run this code on my Standalone Cluster. When i submit the job,the error log 
> is as follows:
> {code}
> 2020-10-20 11:53:42,969 WARN 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
> Could not execute application: 
>  org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Cannot have more than one execute() or executeAsync() call 
> in a single environment.
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>  [?:1.8.0_221]
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_221]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_221]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more 
> than one execute() or executeAsync() call in a single environment.
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) 
> ~[?:?]
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_221]
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_221]
>  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
> {code}
> my code is:
> {code:java}
> final StreamExecutionEnvironment env = 
> 

[jira] [Comment Edited] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable

2020-10-21 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218765#comment-17218765
 ] 

Dian Fu edited comment on FLINK-19759 at 10/22/20, 5:54 AM:


cc [~TsReaper] [~godfreyhe]


was (Author: dian.fu):
cc [~TsReaper]

> DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
> -
>
> Key: FLINK-19759
> URL: https://issues.apache.org/jira/browse/FLINK-19759
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29
> {code}
> [ERROR]   DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 
> planAfter expected:<...=[>(cnt, 3)])
> :  +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)])
> :  +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalHash]Aggregate(select=[Pa...>
> [INFO] 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable

2020-10-21 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218772#comment-17218772
 ] 

Dian Fu commented on FLINK-19759:
-

Upgrade to blocker as it's continuously failing.

> DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
> -
>
> Key: FLINK-19759
> URL: https://issues.apache.org/jira/browse/FLINK-19759
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29
> {code}
> [ERROR]   DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 
> planAfter expected:<...=[>(cnt, 3)])
> :  +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)])
> :  +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalHash]Aggregate(select=[Pa...>
> [INFO] 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable

2020-10-21 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-19759:

Priority: Blocker  (was: Major)

> DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
> -
>
> Key: FLINK-19759
> URL: https://issues.apache.org/jira/browse/FLINK-19759
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29
> {code}
> [ERROR]   DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 
> planAfter expected:<...=[>(cnt, 3)])
> :  +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)])
> :  +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalHash]Aggregate(select=[Pa...>
> [INFO] 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable

2020-10-21 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218771#comment-17218771
 ] 

Dian Fu commented on FLINK-19759:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8053=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29

> DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
> -
>
> Key: FLINK-19759
> URL: https://issues.apache.org/jira/browse/FLINK-19759
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29
> {code}
> [ERROR]   DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 
> planAfter expected:<...=[>(cnt, 3)])
> :  +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)])
> :  +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalHash]Aggregate(select=[Pa...>
> [INFO] 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable

2020-10-21 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218765#comment-17218765
 ] 

Dian Fu commented on FLINK-19759:
-

cc [~TsReaper]

> DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
> -
>
> Key: FLINK-19759
> URL: https://issues.apache.org/jira/browse/FLINK-19759
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29
> {code}
> [ERROR]   DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 
> planAfter expected:<...=[>(cnt, 3)])
> :  +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)])
> :  +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalHash]Aggregate(select=[Pa...>
> [INFO] 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable

2020-10-21 Thread Dian Fu (Jira)
Dian Fu created FLINK-19759:
---

 Summary: DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange 
is instable
 Key: FLINK-19759
 URL: https://issues.apache.org/jira/browse/FLINK-19759
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Dian Fu
 Fix For: 1.12.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29

{code}
[ERROR]   DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 
planAfter expected:<...=[>(cnt, 3)])
:  +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
reuse_id=[1])
: +- Exchange(distribution=[single])
:+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)])
:  +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
reuse_id=[1])
: +- Exchange(distribution=[single])
:+- LocalHash]Aggregate(select=[Pa...>
[INFO] 
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable

2020-10-21 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-19759:

Labels: test-stability  (was: )

> DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
> -
>
> Key: FLINK-19759
> URL: https://issues.apache.org/jira/browse/FLINK-19759
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29
> {code}
> [ERROR]   DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 
> planAfter expected:<...=[>(cnt, 3)])
> :  +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)])
> :  +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
> reuse_id=[1])
> : +- Exchange(distribution=[single])
> :+- LocalHash]Aggregate(select=[Pa...>
> [INFO] 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] guoweiM commented on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

2020-10-21 Thread GitBox


guoweiM commented on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-714246544


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] godfreyhe merged pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade

2020-10-21 Thread GitBox


godfreyhe merged pull request #13737:
URL: https://github.com/apache/flink/pull/13737


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19757) TimeStampData can cause time inconsistent problem

2020-10-21 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218759#comment-17218759
 ] 

xiaogang zhou commented on FLINK-19757:
---

[~jark] thx for commenting, if i run the sql 

tEnv.executeSql("CREATE TABLE Source (\n" +
 " -- declare the schema of the table\n" +
 " `name` STRING,\n" +
 " `num` INT,\n" +
 " `xtime` as proctime()\n"+
 ") WITH (\n" +
 " -- declare the external system to connect to\n" +
 " 'connector' = 'bsql-datagen',\n" +
 " 'rows-per-second' = '1'\n" +
 ")");

 

if i call the stream.print() , i will get a time 8 hrs ago(as my default time 
zone is +8). how can i fix this problem?

 

furthermore, if i print 

Timestamp ts = new Timestamp(System.currentTimeMillis());

System.err.println(TimestampData.fromTimestamp(ts));    
//2020-10-22T13:40:28.596
System.err.println(TimestampData.fromEpochMillis(System.currentTimeMillis()));  
 //2020-10-22T05:40:28.724

 

this is pretty strange, can you please suggest in which way i can get the 
correct time?

thx for your time

 

 

> TimeStampData can cause time inconsistent problem
> -
>
> Key: FLINK-19757
> URL: https://issues.apache.org/jira/browse/FLINK-19757
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> when we check jdk LocalDateTime code,we find that
>  
> {code:java}
> // code placeholder
> public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
> ZoneOffset offset) {
> Objects.requireNonNull(offset, "offset");
> NANO_OF_SECOND.checkValidValue(nanoOfSecond);
> long localSecond = epochSecond + offset.getTotalSeconds();  // overflow 
> caught later
> long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY);
> int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
> LocalDate date = LocalDate.ofEpochDay(localEpochDay);
> LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
> nanoOfSecond);
> return new LocalDateTime(date, time);
> }
> {code}
>  
> offset.getTotalSeconds() they add the offset, but in the TimeStampData
> toLocalDateTime, we don't add a offset.
>  
> I'd like to add a TimeZone.getDefault().getRawOffset() in the 
> toLocalDateTime()
> and minus a TimeZone.getDefault().getRawOffset() in the 
> fromLocalDateTime



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ commented on pull request #13581: [FLINK-17331] Explicitly get the ByteBuf length of all classes which …

2020-10-21 Thread GitBox


KarmaGYZ commented on pull request #13581:
URL: https://github.com/apache/flink/pull/13581#issuecomment-714242065


   cc @zhijiangW 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-21 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r509891311



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+/**
+ * Represent {@link KubernetesLeaderElector} in kubernetes. {@link 
LeaderElector#run()} is a blocking call. It should be
+ *  run in the IO executor, not the main thread. The lifecycle is bound to 
single leader election. Once the leadership
+ * is revoked, as well as the {@link LeaderCallbackHandler#notLeader()} is 
called, the {@link LeaderElector#run()} will
+ * finish. To start another round of election, we need to trigger again.
+ */
+public class KubernetesLeaderElector extends 
LeaderElector {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaderElector.class);
+   protected static final String LOCK_IDENTITY = 
UUID.randomUUID().toString();

Review comment:
   Maybe I am not making myself clear. The `lockIdentity` will be a 
non-static field in `KubernetesHaServices` after refactor, which means 
`KubernetesHaServices` will have a dedicated lock identity for all the 
components above. But different instances will have different identities.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13690:
URL: https://github.com/apache/flink/pull/13690#issuecomment-712304240


   
   ## CI report:
   
   * 518581ab52a2d976ff344404a6828cec23c260f9 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8054)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8039)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13739: [FLINK-19232][python] support iterating MapState and MapView

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13739:
URL: https://github.com/apache/flink/pull/13739#issuecomment-714232634


   
   ## CI report:
   
   * 9f75014b48308eb93937e11fcd8f4da80de12bd4 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8074)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-21 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r50975



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+   private final FlinkKubeClient kubeClient;
+
+   private final Executor executor;
+
+   private final String configMapName;
+
+   private final KubernetesLeaderElector leaderElector;
+
+   private KubernetesWatch kubernetesWatch;
+
+   // Labels will be used to clean up the ha related ConfigMaps.
+   private Map configMapLabels;
+
+   KubernetesLeaderElectionService(
+   FlinkKubeClient kubeClient,
+   Executor executor,
+   KubernetesLeaderElectionConfiguration leaderConfig) {
+
+   this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+   this.executor = checkNotNull(executor, "Executor should not be 
null.");
+   this.configMapName = leaderConfig.getConfigMapName();
+   this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+   this.leaderContender = null;
+   this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+   leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+   }
+
+   @Override
+   public void internalStart(LeaderContender contender) {
+   CompletableFuture.runAsync(leaderElector::run, executor);
+   kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+   }
+
+   @Override
+   public void internalStop() {
+   if (kubernetesWatch != null) {
+   kubernetesWatch.close();
+   }
+   }
+
+   @Override
+   protected void writeLeaderInformation() {
+   try {
+   kubeClient.checkAndUpdateConfigMap(
+   configMapName,
+   configMap -> {
+   if 
(leaderElector.hasLeadership(configMap)) {
+   // Get the updated ConfigMap 
with new leader information
+   

[GitHub] [flink] gm7y8 removed a comment on pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI

2020-10-21 Thread GitBox


gm7y8 removed a comment on pull request #13458:
URL: https://github.com/apache/flink/pull/13458#issuecomment-714126117


   @flinkbot approve description



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19743) Add Source metrics definitions

2020-10-21 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-19743:
-
Summary: Add Source metrics definitions  (was: Add metrics definitions.)

> Add Source metrics definitions
> --
>
> Key: FLINK-19743
> URL: https://issues.apache.org/jira/browse/FLINK-19743
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Affects Versions: 1.11.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
>
> Add the metrics defined in 
> [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics]
>  to \{{OperatorMetricsGroup}} and {{SourceReaderContext}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and 
this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation and no other places, this whole 
threading interaction model would be simplified in a great way. That says only 
one new netty thread can release the view**



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and 
this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation, this whole threading interaction model 
would be simplified in a great way. That says only one new netty thread can 
release the view**



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13739: [FLINK-19232][python] support iterating MapState and MapView

2020-10-21 Thread GitBox


flinkbot commented on pull request #13739:
URL: https://github.com/apache/flink/pull/13739#issuecomment-714232634


   
   ## CI report:
   
   * 9f75014b48308eb93937e11fcd8f4da80de12bd4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13738: [FLINK-19757][flink-table-common] fix the local datetime inconsistent…

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13738:
URL: https://github.com/apache/flink/pull/13738#issuecomment-714225386


   
   ## CI report:
   
   * 065898f3ea61da7479e614517c03acfbce10ebfb Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8073)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13721: [FLINK-19694][table] Support Upsert ChangelogMode for ScanTableSource

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13721:
URL: https://github.com/apache/flink/pull/13721#issuecomment-713489063


   
   ## CI report:
   
   * 1e1971368a88e3457dbf60cecfe319d58649092d Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8025)
 
   * fa0c87c737cee61fe6fd5b6655916eb97d18aaeb Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8072)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-19758) Implement a new unified File Sink based on the new Sink API

2020-10-21 Thread Yun Gao (Jira)
Yun Gao created FLINK-19758:
---

 Summary: Implement a new unified File Sink based on the new Sink 
API
 Key: FLINK-19758
 URL: https://issues.apache.org/jira/browse/FLINK-19758
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Connectors / FileSystem
Affects Versions: 1.12.0
Reporter: Yun Gao
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and 
this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation, this whole threading interaction model 
would be simplified in a great way. That says only one new netty thread can 
release the view**
   
   I couldn't see potential risks we can not do this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and 
this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   
   **And if we only release before creation, this whole threading interaction 
model would be simplified in a great way. That says only one new netty thread 
can release the view**
   
   I couldn't see potential risks we can not do this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13739: [FLINK-19232][python] support iterating MapState and MapView

2020-10-21 Thread GitBox


flinkbot commented on pull request #13739:
URL: https://github.com/apache/flink/pull/13739#issuecomment-714227770


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9f75014b48308eb93937e11fcd8f4da80de12bd4 (Thu Oct 22 
05:01:49 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] WeiZhong94 opened a new pull request #13739: [FLINK-19232][python] support iterating MapState and MapView

2020-10-21 Thread GitBox


WeiZhong94 opened a new pull request #13739:
URL: https://github.com/apache/flink/pull/13739


   ## What is the purpose of the change
   
   *This pull request supports iterating MapState and MapView*
   
   ## Brief change log
   
 - *Support iterating MapState and MapView*
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13738: [FLINK-19757][flink-table-common] fix the local datetime inconsistent…

2020-10-21 Thread GitBox


flinkbot commented on pull request #13738:
URL: https://github.com/apache/flink/pull/13738#issuecomment-714225386


   
   ## CI report:
   
   * 065898f3ea61da7479e614517c03acfbce10ebfb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   
   ## CI report:
   
   * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13736: [FLINK-19654][python][e2e] Reduce pyflink e2e test parallelism

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13736:
URL: https://github.com/apache/flink/pull/13736#issuecomment-714183394


   
   ## CI report:
   
   * 259c893dcb4b5692df54cfe7739f95d9e34096e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8056)
 
   * 111ec785929a0742b46ea98408f585aa03314b1d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8070)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13721: [FLINK-19694][table] Support Upsert ChangelogMode for ScanTableSource

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13721:
URL: https://github.com/apache/flink/pull/13721#issuecomment-713489063


   
   ## CI report:
   
   * 1e1971368a88e3457dbf60cecfe319d58649092d Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8025)
 
   * fa0c87c737cee61fe6fd5b6655916eb97d18aaeb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13653:
URL: https://github.com/apache/flink/pull/13653#issuecomment-709337488


   
   ## CI report:
   
   * cadf9a2a13d3113395f199431881fff216b9a50a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7974)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7958)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8017)
 
   * 0c876d6befc487412629e6a1e8883fa5fbeb31e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8067)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8071)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13581: [FLINK-17331] Explicitly get the ByteBuf length of all classes which …

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13581:
URL: https://github.com/apache/flink/pull/13581#issuecomment-706519397


   
   ## CI report:
   
   * 1e187203b15559b08d15a0cca949ca15a8a3bafe Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7383)
 
   * fcc301fbb0d04ada457d2c39d325ede52cc0db8d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8069)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead

2020-10-21 Thread GitBox


wuchong commented on pull request #13653:
URL: https://github.com/apache/flink/pull/13653#issuecomment-714215751


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19757) TimeStampData can cause time inconsistent problem

2020-10-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218728#comment-17218728
 ] 

Jark Wu commented on FLINK-19757:
-

Hi [~zhoujira86], the current implementation is correct. 

The conversion between {{LocalDateTime}} and {{TimestampData}} happens when the 
SQL type is {{TIMESTAMP}}. 
{{TIMESTAMP}} type is a value without time zone, which is the same semantic of 
{{LocalDateTime}}. 
We store such data in Flink using the epoch seconds since {{1970-01-01 
00:00:00}}. Therefore, we shounldn't add time zone offsets to the method. 
Otherwise, the logic is wrong and tests will fail. 

> TimeStampData can cause time inconsistent problem
> -
>
> Key: FLINK-19757
> URL: https://issues.apache.org/jira/browse/FLINK-19757
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> when we check jdk LocalDateTime code,we find that
>  
> {code:java}
> // code placeholder
> public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
> ZoneOffset offset) {
> Objects.requireNonNull(offset, "offset");
> NANO_OF_SECOND.checkValidValue(nanoOfSecond);
> long localSecond = epochSecond + offset.getTotalSeconds();  // overflow 
> caught later
> long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY);
> int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
> LocalDate date = LocalDate.ofEpochDay(localEpochDay);
> LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
> nanoOfSecond);
> return new LocalDateTime(date, time);
> }
> {code}
>  
> offset.getTotalSeconds() they add the offset, but in the TimeStampData
> toLocalDateTime, we don't add a offset.
>  
> I'd like to add a TimeZone.getDefault().getRawOffset() in the 
> toLocalDateTime()
> and minus a TimeZone.getDefault().getRawOffset() in the 
> fromLocalDateTime



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task 
cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and 
this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   
   **And if we only release before creation, this whole threading interaction 
model would be simplified in a great way. That says only one new netty thread 
can release the view**
   
   I couldn't see potential risks we can not do this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task 
cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and 
this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   
   **And if we only release before creation, this whole threading interaction 
model would be simplified in a great way. That says only one netty thread can 
release the view**
   
   I couldn't see potential risks we can not do this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task 
cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and 
this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   
   **And if we only release before creation, this whole threading interaction 
model would be simplified in a great way.**
   
   I couldn't see potential risks we can not do this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19687) Support to get execution plan in `StatementSet`

2020-10-21 Thread xiaozilong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218724#comment-17218724
 ] 

xiaozilong commented on FLINK-19687:


cc [~godfreyhe]

> Support to get execution plan in `StatementSet`
> ---
>
> Key: FLINK-19687
> URL: https://issues.apache.org/jira/browse/FLINK-19687
> Project: Flink
>  Issue Type: Wish
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: xiaozilong
>Priority: Major
>
> Hi, I want to get job's execution plan in Flink SQL 1.11, but i meet 
> exception "No operators defined in streaming topology. Cannot execute." when 
> use `env.getExecutionPlan()`. The same code runs fine in Flink SQL 1.10. I 
> found translation operations only happen when StatementSet.execute() is 
> called in Flink SQL 1.11. So we cannot get job's execution plan before the 
> job submit? Can we support to get execution plan in `StatementSet` or invoke 
> method `TableEnvironmentImpl#translate` in `StatementSetImpl#addInsertSql`? I 
> think the latter is better so that we can reuse `env.getExecutionPlan()`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task 
cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and 
this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   
   And if we only release before creation, this whole threading interaction 
model would be simplified in a great way.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` 
are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` 
can see some inconsistent state. For example, `readView == null`, but 
`isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task 
cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to 
`PipelinedSubpartition` made already by `t2`? For example, reset 
`sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and 
then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: In short, it 
won't be possible, because a view can only be released once and this is guarded 
by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the 
view?
   Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13738: [FLINK-19757][flink-table-common] fix the local datetime inconsistent…

2020-10-21 Thread GitBox


flinkbot commented on pull request #13738:
URL: https://github.com/apache/flink/pull/13738#issuecomment-714211013


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 065898f3ea61da7479e614517c03acfbce10ebfb (Thu Oct 22 
04:13:34 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-19757).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19757) TimeStampData can cause time inconsistent problem

2020-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19757:
---
Labels: pull-request-available  (was: )

> TimeStampData can cause time inconsistent problem
> -
>
> Key: FLINK-19757
> URL: https://issues.apache.org/jira/browse/FLINK-19757
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> when we check jdk LocalDateTime code,we find that
>  
> {code:java}
> // code placeholder
> public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
> ZoneOffset offset) {
> Objects.requireNonNull(offset, "offset");
> NANO_OF_SECOND.checkValidValue(nanoOfSecond);
> long localSecond = epochSecond + offset.getTotalSeconds();  // overflow 
> caught later
> long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY);
> int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
> LocalDate date = LocalDate.ofEpochDay(localEpochDay);
> LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
> nanoOfSecond);
> return new LocalDateTime(date, time);
> }
> {code}
>  
> offset.getTotalSeconds() they add the offset, but in the TimeStampData
> toLocalDateTime, we don't add a offset.
>  
> I'd like to add a TimeZone.getDefault().getRawOffset() in the 
> toLocalDateTime()
> and minus a TimeZone.getDefault().getRawOffset() in the 
> fromLocalDateTime



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13736: [FLINK-19654][python][e2e] Reduce pyflink e2e test parallelism

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13736:
URL: https://github.com/apache/flink/pull/13736#issuecomment-714183394


   
   ## CI report:
   
   * 259c893dcb4b5692df54cfe7739f95d9e34096e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8056)
 
   * 111ec785929a0742b46ea98408f585aa03314b1d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhougit86 opened a new pull request #13738: [FLINK-19757][flink-table-common] fix the local datetime inconsistent…

2020-10-21 Thread GitBox


zhougit86 opened a new pull request #13738:
URL: https://github.com/apache/flink/pull/13738


   … problem
   
   
   
   ## What is the purpose of the change
   
   fix the timestampdata inconsistent problem
   
   
   ## Brief change log
   
   add offset in toLocalDateTime
   minus offset in fromLocalDateTime
   
   
   ## Verifying this change
   
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable )
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   
   ## CI report:
   
   * 659b57b42486d3b79197653fec933ce42766388e Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033)
 
   * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13623: [FLINK-19606][table-runtime] Implement streaming window join operator

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13623:
URL: https://github.com/apache/flink/pull/13623#issuecomment-708132144


   
   ## CI report:
   
   * 1de96f52dee7fc493ca0cad827189955520362cc Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8066)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13653:
URL: https://github.com/apache/flink/pull/13653#issuecomment-709337488


   
   ## CI report:
   
   * cadf9a2a13d3113395f199431881fff216b9a50a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7974)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7958)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8017)
 
   * 0c876d6befc487412629e6a1e8883fa5fbeb31e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8067)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13581: [FLINK-17331] Explicitly get the ByteBuf length of all classes which …

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13581:
URL: https://github.com/apache/flink/pull/13581#issuecomment-706519397


   
   ## CI report:
   
   * 1e187203b15559b08d15a0cca949ca15a8a3bafe Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7383)
 
   * fcc301fbb0d04ada457d2c39d325ede52cc0db8d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19757) TimeStampData can cause time inconsistent problem

2020-10-21 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-19757:
--
Description: 
when we check jdk LocalDateTime code,we find that

 
{code:java}
// code placeholder


public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
ZoneOffset offset) {
Objects.requireNonNull(offset, "offset");
NANO_OF_SECOND.checkValidValue(nanoOfSecond);
long localSecond = epochSecond + offset.getTotalSeconds();  // overflow 
caught later
long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY);
int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
LocalDate date = LocalDate.ofEpochDay(localEpochDay);
LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
nanoOfSecond);
return new LocalDateTime(date, time);
}
{code}
 

offset.getTotalSeconds() they add the offset, but in the TimeStampData

toLocalDateTime, we don't add a offset.

 

I'd like to add a TimeZone.getDefault().getRawOffset() in the 

toLocalDateTime()

and minus a TimeZone.getDefault().getRawOffset() in the 

fromLocalDateTime

  was:
when we check jdk LocalDateTime code,we find that

public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
ZoneOffset offset)

{

Objects.requireNonNull(offset, "offset"); 
NANO_OF_SECOND.checkValidValue(nanoOfSecond);

long localSecond = epochSecond + offset.getTotalSeconds();

// overflow caught later long localEpochDay = Math.floorDiv(localSecond, 
SECONDS_PER_DAY);

int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);

LocalDate date = LocalDate.ofEpochDay(localEpochDay);

LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
nanoOfSecond);

return new LocalDateTime(date, time); }

 

offset.getTotalSeconds() they add the offset, but in the TimeStampData

toLocalDateTime, we don't add a offset.

 

I'd like to add a TimeZone.getDefault().getRawOffset() in the 

toLocalDateTime()

and minus a TimeZone.getDefault().getRawOffset() in the 

fromLocalDateTime


> TimeStampData can cause time inconsistent problem
> -
>
> Key: FLINK-19757
> URL: https://issues.apache.org/jira/browse/FLINK-19757
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
>Reporter: xiaogang zhou
>Priority: Major
>
> when we check jdk LocalDateTime code,we find that
>  
> {code:java}
> // code placeholder
> public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
> ZoneOffset offset) {
> Objects.requireNonNull(offset, "offset");
> NANO_OF_SECOND.checkValidValue(nanoOfSecond);
> long localSecond = epochSecond + offset.getTotalSeconds();  // overflow 
> caught later
> long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY);
> int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
> LocalDate date = LocalDate.ofEpochDay(localEpochDay);
> LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
> nanoOfSecond);
> return new LocalDateTime(date, time);
> }
> {code}
>  
> offset.getTotalSeconds() they add the offset, but in the TimeStampData
> toLocalDateTime, we don't add a offset.
>  
> I'd like to add a TimeZone.getDefault().getRawOffset() in the 
> toLocalDateTime()
> and minus a TimeZone.getDefault().getRawOffset() in the 
> fromLocalDateTime



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19757) TimeStampData can cause time inconsistent problem

2020-10-21 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-19757:
--
Description: 
when we check jdk LocalDateTime code,we find that

public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
ZoneOffset offset)

{

Objects.requireNonNull(offset, "offset"); 
NANO_OF_SECOND.checkValidValue(nanoOfSecond);

long localSecond = epochSecond + offset.getTotalSeconds();

// overflow caught later long localEpochDay = Math.floorDiv(localSecond, 
SECONDS_PER_DAY);

int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);

LocalDate date = LocalDate.ofEpochDay(localEpochDay);

LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
nanoOfSecond);

return new LocalDateTime(date, time); }

 

offset.getTotalSeconds() they add the offset, but in the TimeStampData

toLocalDateTime, we don't add a offset.

 

I'd like to add a TimeZone.getDefault().getRawOffset() in the 

toLocalDateTime()

and minus a TimeZone.getDefault().getRawOffset() in the 

fromLocalDateTime

  was:
when we check jdk LocalDateTime code,we find that

public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
ZoneOffset offset) {
 Objects.requireNonNull(offset, "offset");
 NANO_OF_SECOND.checkValidValue(nanoOfSecond);
 long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught 
later
 long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY);
 int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
 LocalDate date = LocalDate.ofEpochDay(localEpochDay);
 LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
nanoOfSecond);
 return new LocalDateTime(date, time);
}

 

offset.getTotalSeconds() they add the offset, but in the TimeStampData

toLocalDateTime, we don't add a offset.

 

I'd like to add a TimeZone.getDefault().getRawOffset() in the 

toLocalDateTime()

and minus a TimeZone.getDefault().getRawOffset() in the 

fromLocalDateTime


> TimeStampData can cause time inconsistent problem
> -
>
> Key: FLINK-19757
> URL: https://issues.apache.org/jira/browse/FLINK-19757
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
>Reporter: xiaogang zhou
>Priority: Major
>
> when we check jdk LocalDateTime code,we find that
> public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
> ZoneOffset offset)
> {
> Objects.requireNonNull(offset, "offset"); 
> NANO_OF_SECOND.checkValidValue(nanoOfSecond);
> long localSecond = epochSecond + offset.getTotalSeconds();
> // overflow caught later long localEpochDay = Math.floorDiv(localSecond, 
> SECONDS_PER_DAY);
> int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
> LocalDate date = LocalDate.ofEpochDay(localEpochDay);
> LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
> nanoOfSecond);
> return new LocalDateTime(date, time); }
>  
> offset.getTotalSeconds() they add the offset, but in the TimeStampData
> toLocalDateTime, we don't add a offset.
>  
> I'd like to add a TimeZone.getDefault().getRawOffset() in the 
> toLocalDateTime()
> and minus a TimeZone.getDefault().getRawOffset() in the 
> fromLocalDateTime



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19757) TimeStampData can cause time inconsistent problem

2020-10-21 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-19757:
-

 Summary: TimeStampData can cause time inconsistent problem
 Key: FLINK-19757
 URL: https://issues.apache.org/jira/browse/FLINK-19757
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.11.1
Reporter: xiaogang zhou


when we check jdk LocalDateTime code,we find that

public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
ZoneOffset offset) {
 Objects.requireNonNull(offset, "offset");
 NANO_OF_SECOND.checkValidValue(nanoOfSecond);
 long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught 
later
 long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY);
 int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
 LocalDate date = LocalDate.ofEpochDay(localEpochDay);
 LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
nanoOfSecond);
 return new LocalDateTime(date, time);
}

 

offset.getTotalSeconds() they add the offset, but in the TimeStampData

toLocalDateTime, we don't add a offset.

 

I'd like to add a TimeZone.getDefault().getRawOffset() in the 

toLocalDateTime()

and minus a TimeZone.getDefault().getRawOffset() in the 

fromLocalDateTime



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads 
access the same view (in other words, threading model in netty + task thread 
interaction on view). Thanks @zhijiangW a lot for the explanation of the 
underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released 
in different scenarios, here is what I think is happening. @zhijiangW , please 
correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through 
PartitionRequestServerHandler (one of the netty worker threads)'s 
PartitionRequest. The created view is owned by subPartition, and a reference of 
the view is returned and bounded with a reader.  Reader belongs to 
PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can 
see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
This is called from two places, 
- one from `reader`, the reader uses its own reference of the view to 
release resources. This is from downstream task cancelation. 
- one from `subPartition release`. This is fine, the subpartition 
releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully 
before the new view is created?
   - No, because the reader and the view would be removed upon downstream 
task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the 
view?
   - Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the old view (through view 
reference) again afterwards, since a view can only be released once.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads 
access the same view (in other words, threading model in netty + task thread 
interaction on view). Thanks @zhijiangW a lot for the explanation of the 
underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released 
in different scenarios, here is what I think is happening. @zhijiangW , please 
correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through 
PartitionRequestServerHandler (one of the netty worker threads)'s 
PartitionRequest. The created view is owned by subPartition, and a reference of 
the view is returned and bounded with a reader.  Reader belongs to 
PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can 
see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
This is called from two places, 
- one from `reader`, the reader uses its own reference of the view to 
release resources. This is from downstream task cancelation. 
- one from `subPartition release`. This is fine, the subpartition 
releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully 
before the new view is created?
   - No, because the reader and the view would be removed upon downstream 
task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the 
view?
   - Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the view (through view reference) 
again afterwards, since a view can only be released once.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.

2020-10-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218721#comment-17218721
 ] 

Jark Wu commented on FLINK-19754:
-

Could you share the job graph in Web UI?

> Cannot have more than one execute() or executeAsync() call in a single 
> environment.
> ---
>
> Key: FLINK-19754
> URL: https://issues.apache.org/jira/browse/FLINK-19754
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: little-tomato
>Priority: Major
>
> I run this code on my Standalone Cluster. When i submit the job,the error log 
> is as follows:
> {code}
> 2020-10-20 11:53:42,969 WARN 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
> Could not execute application: 
>  org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Cannot have more than one execute() or executeAsync() call 
> in a single environment.
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>  [?:1.8.0_221]
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_221]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_221]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more 
> than one execute() or executeAsync() call in a single environment.
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) 
> ~[?:?]
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_221]
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_221]
>  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
> {code}
> my code is:
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
>  ...
>  FlinkKafkaConsumer myConsumer = new 
> FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), 
> properties);
>  myConsumer.setStartFromLatest();
> DataStream kafkaDataStream = env.addSource(myConsumer);
> SingleOutputStreamOperator sourceStream = kafkaDataStream
>  .map(new MapFunction()
> { ... }
> );
> DataStream 

[GitHub] [flink] flinkbot edited a comment on pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13737:
URL: https://github.com/apache/flink/pull/13737#issuecomment-714194471


   
   ## CI report:
   
   * 20fd494ddd03e04ffe024ee24e4a31941950f787 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8062)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13729:
URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157


   
   ## CI report:
   
   * 659b57b42486d3b79197653fec933ce42766388e Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033)
 
   * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.

2020-10-21 Thread little-tomato (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218719#comment-17218719
 ] 

little-tomato commented on FLINK-19754:
---

I try it,but StreamExecutionEnvironment DataStream code does not take effect.

> Cannot have more than one execute() or executeAsync() call in a single 
> environment.
> ---
>
> Key: FLINK-19754
> URL: https://issues.apache.org/jira/browse/FLINK-19754
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: little-tomato
>Priority: Major
>
> I run this code on my Standalone Cluster. When i submit the job,the error log 
> is as follows:
> {code}
> 2020-10-20 11:53:42,969 WARN 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
> Could not execute application: 
>  org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Cannot have more than one execute() or executeAsync() call 
> in a single environment.
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>  [?:1.8.0_221]
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_221]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_221]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more 
> than one execute() or executeAsync() call in a single environment.
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) 
> ~[?:?]
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_221]
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_221]
>  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
> {code}
> my code is:
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
>  ...
>  FlinkKafkaConsumer myConsumer = new 
> FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), 
> properties);
>  myConsumer.setStartFromLatest();
> DataStream kafkaDataStream = env.addSource(myConsumer);
> SingleOutputStreamOperator sourceStream = kafkaDataStream
>  .map(new 

[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads 
access the same view (in other words, threading model in netty + task thread 
interaction on view). Thanks @zhijiangW a lot for the explanation of the 
underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released 
in different scenarios, here is what I think is happening. @zhijiangW , please 
correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through 
PartitionRequestServerHandler (one of the netty worker threads)'s 
PartitionRequest. The created view is owned by subPartition, and a reference of 
the view is returned and bounded with a reader.  Reader belongs to 
PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can 
see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
This is called from two places, 
- one from `reader`, the reader uses its own reference of the view to 
release resources. This is from downstream task cancelation. 
- one from `subPartition release`. This is fine, the subpartition 
releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully 
before the new view is created?
   - No, because the reader and the view would be removed upon downstream 
task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the 
view?
   - Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the view again afterwards, since a 
view can only be released once.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13653:
URL: https://github.com/apache/flink/pull/13653#issuecomment-709337488


   
   ## CI report:
   
   * cadf9a2a13d3113395f199431881fff216b9a50a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7974)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7958)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8017)
 
   * 0c876d6befc487412629e6a1e8883fa5fbeb31e6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads 
access the same view (in other words, threading model in netty + task thread 
interaction on view). Thanks @zhijiangW a lot for the explanation of the 
underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released 
in different scenarios, here is what I think is happening. @zhijiangW , please 
correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through 
PartitionRequestServerHandler (one of the netty worker threads)'s 
PartitionRequest. The created view is owned by subPartition, and a reference of 
the view is returned and bounded with a reader.  Reader belongs to 
PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can 
see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
This is called from two places, 
- one from `reader`, the reader uses its own reference of the view to 
release resources. This is from downstream task cancelation. 
- one from `subPartition release`. This is fine, the subpartition 
releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully 
before the new view is created?
   - No, because the reader and the view would be removed upon downstream 
task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the 
view?
   - Thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the view again, since a view can 
only be released once.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13623: [FLINK-19606][table-runtime] Implement streaming window join operator

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13623:
URL: https://github.com/apache/flink/pull/13623#issuecomment-708132144


   
   ## CI report:
   
   * 15cc49ee7ecb3060ec52ab5545f4b6fae58c25de Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7559)
 
   * 1de96f52dee7fc493ca0cad827189955520362cc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

2020-10-21 Thread GitBox


curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads 
access the same view (in other words, threading model in netty + task thread 
interaction on view). Thanks @zhijiangW a lot for the explanation of the 
underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released 
in different scenarios, here is what I think is happening. @zhijiangW , please 
correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through 
PartitionRequestServerHandler (one of the netty worker threads)'s 
PartitionRequest. The created view is owned by subPartition, and a reference of 
the view is returned and bounded with a reader.  Reader belongs to 
PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can 
see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
This is called from two places, 
- one from `reader`, the reader uses its own reference of the view to 
release resources. This is from downstream task cancelation. 
- one from `subPartition release`. This is fine, the subpartition 
releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully 
before the new view is created?
   - No, because the reader and the view would be removed upon downstream 
task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the 
view?
   thread2 releases the view that thread1 holds the reference on before 
creating a new view. Thread1 can not release the view again, since a view can 
only be released once.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] leonardBang commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

2020-10-21 Thread GitBox


leonardBang commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r509860215



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java
##
@@ -47,29 +59,46 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_CACHE_TTL;
+import static 
org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_PARTITION;
+import static 
org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getPartitionByPartitionSpecs;
+import static 
org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getTableProps;
+import static 
org.apache.flink.table.catalog.hive.util.HivePartitionUtils.toHiveTablePartition;
+import static 
org.apache.flink.table.catalog.hive.util.HivePartitionUtils.validateAndParsePartitionSpecs;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup table function for Hive connector tables.
  */
-public class FileSystemLookupFunction extends 
TableFunction {
+public class HiveLookupFunction extends 
TableFunction {

Review comment:
   The  `FileSystemLookupFunction ` only used in HiveTableSource.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-18971) Support to mount kerberos conf as ConfigMap and Keytab as Secrete

2020-10-21 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-18971.

Resolution: Done

Done via
* master: dd481134f24e8fd1ce65e827e0c5c6350c5be9b2

> Support to mount kerberos conf as ConfigMap and Keytab as Secrete
> -
>
> Key: FLINK-18971
> URL: https://issues.apache.org/jira/browse/FLINK-18971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Major
>  Labels: pull-request-available
>
> Currently, if user want to enable Kerberos Authentication, they need to build 
> a custom image with keytab and krb5 conf file. To improve usability, we need 
> to create a ConfigMap and a Secrete for krb5 conf and keytab when needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18971) Support to mount kerberos conf as ConfigMap and Keytab as Secrete

2020-10-21 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-18971:
-
Fix Version/s: 1.12.0

> Support to mount kerberos conf as ConfigMap and Keytab as Secrete
> -
>
> Key: FLINK-18971
> URL: https://issues.apache.org/jira/browse/FLINK-18971
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Currently, if user want to enable Kerberos Authentication, they need to build 
> a custom image with keytab and krb5 conf file. To improve usability, we need 
> to create a ConfigMap and a Secrete for krb5 conf and keytab when needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong closed pull request #13255: [FLINK-18971] Support to mount kerberos conf as ConfigMap and Keytab …

2020-10-21 Thread GitBox


xintongsong closed pull request #13255:
URL: https://github.com/apache/flink/pull/13255


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join

2020-10-21 Thread GitBox


JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r509859082



##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java
##
@@ -47,29 +59,46 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_CACHE_TTL;
+import static 
org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_PARTITION;
+import static 
org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getPartitionByPartitionSpecs;
+import static 
org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getTableProps;
+import static 
org.apache.flink.table.catalog.hive.util.HivePartitionUtils.toHiveTablePartition;
+import static 
org.apache.flink.table.catalog.hive.util.HivePartitionUtils.validateAndParsePartitionSpecs;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Lookup table function for filesystem connector tables.
+ * Lookup table function for Hive connector tables.
  */
-public class FileSystemLookupFunction extends 
TableFunction {
+public class HiveLookupFunction extends 
TableFunction {

Review comment:
   Why do this? Why not Filesystem also have this lookup capability?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19733) Make fast_operation and slow_operation produce functions consistent

2020-10-21 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-19733.
---
Fix Version/s: 1.12.0
 Assignee: Huang Xingbo
   Resolution: Fixed

Merged to master via d5e81688d85b8f24161a4397c8ef8dfac0bbcd51

> Make fast_operation and slow_operation produce functions consistent
> ---
>
> Key: FLINK-19733
> URL: https://issues.apache.org/jira/browse/FLINK-19733
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> The function generated by slow_operation uses the characteristics of python 
> syntax. In order to better reconstruct the python operation, we need to keep 
> the functions generated by fast_operation and slow_operation consistent.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-21 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r509857812



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##
@@ -104,6 +106,67 @@ KubernetesWatch watchPodsAndDoCallback(
Map labels,
WatchCallbackHandler podCallbackHandler);
 
+   /**
+* Create the ConfigMap with specified content. If the ConfigMap 
already exists, a FlinkRuntimeException will be
+* thrown.
+*
+* @param configMap ConfigMap.
+*
+* @return Return the ConfigMap create future.
+*/
+   CompletableFuture createConfigMap(KubernetesConfigMap configMap);
+
+   /**
+* Get the ConfigMap with specified name.
+*
+* @param name ConfigMap name.
+*
+* @return Return the ConfigMap, or empty if the ConfigMap does not 
exist.
+*/
+   Optional getConfigMap(String name);
+
+   /**
+* Update an existing ConfigMap with the data. Benefit from https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions>
+* resource version and combined with {@link 
#getConfigMap(String)}, we could perform a get-check-and-update
+* transactional operation. Since concurrent modification could happen 
on a same ConfigMap,
+* the update operation may fail. We need to retry internally. The max 
retry attempts could be
+* configured via {@link 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES}.
+*
+* @param configMapName ConfigMap to be replaced with.
+* @param function  Function to be applied to the obtained 
ConfigMap and get a new updated one. If the returned

Review comment:
   For example, we do the serialize/deserialize/discard-state for the job 
graph store or completed checkpoint. We could encounter the `Exception`. I 
think we could retry in such situation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19734) Replace 'collection' connector by 'values' connector for temporal join plan tests

2020-10-21 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-19734.
---
Resolution: Fixed

Fixed in master: fc91b0830384e10b87ff0d8ab05258e3f89a8d1b

> Replace 'collection' connector by 'values' connector for temporal join plan 
> tests
> -
>
> Key: FLINK-19734
> URL: https://issues.apache.org/jira/browse/FLINK-19734
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Leonard Xu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Currently, both COLLECTION and VALUES connectors are `LookupTableSoure`, we 
> can add a non lookup table source connector to cover  scan-only source.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu closed pull request #13713: [FLINK-19733][python] Make fast_operation and slow_operation produce functions consistent

2020-10-21 Thread GitBox


dianfu closed pull request #13713:
URL: https://github.com/apache/flink/pull/13713


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong merged pull request #13708: [FLINK-19734][table-planner-blink] Replace 'collection' connector by 'values' connector for temporal join plan tests

2020-10-21 Thread GitBox


wuchong merged pull request #13708:
URL: https://github.com/apache/flink/pull/13708


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #13708: [FLINK-19734][table-planner-blink] Replace 'collection' connector by 'values' connector for temporal join plan tests

2020-10-21 Thread GitBox


wuchong commented on pull request #13708:
URL: https://github.com/apache/flink/pull/13708#issuecomment-714197085


   Thanks for the reviewing. 
   
   Merging...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-21 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r509856352



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##
@@ -219,6 +230,71 @@ public KubernetesWatch watchPodsAndDoCallback(
.watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
}
 
+   @Override
+   public CompletableFuture createConfigMap(KubernetesConfigMap 
configMap) {
+   final String configMapName = configMap.getName();
+   return CompletableFuture.runAsync(
+   () -> 
this.internalClient.configMaps().inNamespace(namespace).create(configMap.getInternalResource()),
+   kubeClientExecutorService)
+   .whenComplete((ignored, throwable) -> {
+   if (throwable != null) {
+   throw new FlinkRuntimeException("Failed 
to create ConfigMap " + configMapName, throwable);
+   }
+   });
+   }
+
+   @Override
+   public Optional getConfigMap(String name) {
+   final ConfigMap configMap = 
this.internalClient.configMaps().inNamespace(namespace).withName(name).get();
+   return configMap == null ? Optional.empty() : Optional.of(new 
KubernetesConfigMap(configMap));
+   }
+
+   @Override
+   public CompletableFuture checkAndUpdateConfigMap(
+   String configMapName,
+   FunctionWithException, ?> function) {
+   return FutureUtils.retry(
+   () -> CompletableFuture.supplyAsync(
+   () -> getConfigMap(configMapName)
+   
.map(FunctionUtils.uncheckedFunction(configMap -> {
+   final boolean updated = 
function.apply(configMap).map(
+   updatedConfigMap -> {
+   
this.internalClient.configMaps()
+   
.inNamespace(namespace)
+   
.createOrReplace(updatedConfigMap.getInternalResource());
+   return true;
+   }).orElse(false);
+   if (!updated) {
+   LOG.warn("Trying to 
update ConfigMap {} to {} without checking pass, ignoring.",
+   
configMap.getName(), configMap.getData());
+   }
+   return updated;
+   }))
+   .orElseThrow(
+   () -> new 
FlinkRuntimeException("ConfigMap " + configMapName + " not exists.")),

Review comment:
   What I mean is the ConfigMap could be created in the 
`KubernetesLeaderElectionService#Watcher`. So even the first we get a 
`Optional.empty()`, we could a get correct ConfigMap by retrying.
   
   I will add a two tests here.
   * ConfigMap always does not exists and retry failed
   * ConfigMap exists at the very begging and retry successfully





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19655) NPE when using blink planner and TemporalTableFunction after setting IdleStateRetentionTime

2020-10-21 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-19655.
---
Fix Version/s: 1.12.0
   Resolution: Fixed

Fixed in master: daeda68edf3466a3f9347c25bdf866ef4f620396

> NPE when using blink planner and TemporalTableFunction after setting 
> IdleStateRetentionTime 
> 
>
> Key: FLINK-19655
> URL: https://issues.apache.org/jira/browse/FLINK-19655
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0, 1.11.0
>Reporter: seunjjs
>Assignee: seunjjs
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> My Code here:
> {code:java}
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> bsSettings);
> tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), 
> Time.seconds(600));
> final Table table = tableEnv.from("tableName");
> final TableFunction function = table.createTemporalTableFunction(
> temporalTableEntry.getTimeAttribute(),
> String.join(",", 
> temporalTableEntry.getPrimaryKeyFields()));
> tableEnv.registerFunction(temporalTableEntry.getName(), function);
> {code}
> And NPE throwed when I executed my program.
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109)
>   at 
> org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> 
> And When I changed to useOldPlanner, it worked fine.And when I debuged the 
> code ,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be 
> executed.
> Here is BaseTwoInputStreamOperatorWithStateRetention#open code.
> {code:java}
> public void open() throws Exception {
>   initializeTimerService();
>   if (stateCleaningEnabled) {
>   ValueStateDescriptor cleanupStateDescriptor =
>   new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, 
> Types.LONG);
>   latestRegisteredCleanupTimer = 
> getRuntimeContext().getState(cleanupStateDescriptor);
>   }
>   }
> {code}
> Here is TemporalProcessTimeJoinOperator#open code.
> {code:java}
> public void open() throws Exception {
>   this.joinCondition = 
> generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
>   FunctionUtils.setFunctionRuntimeContext(joinCondition, 
> getRuntimeContext());
>   FunctionUtils.openFunction(joinCondition, new Configuration());
>   ValueStateDescriptor rightStateDesc = new 
> ValueStateDescriptor<>("right", rightType);
>   this.rightState = getRuntimeContext().getState(rightStateDesc);
>   this.collector = new TimestampedCollector<>(output);
>   this.outRow = new JoinedRow();
>   // consider watermark from left stream only.
>   super.processWatermark2(Watermark.MAX_WATERMARK);
>   }
> {code}
> I compared the code with 

[GitHub] [flink] wuchong merged pull request #13675: [FLINK-19655][flink-table-runtime-blink] add super.open() and write unit test for temporal process join

2020-10-21 Thread GitBox


wuchong merged pull request #13675:
URL: https://github.com/apache/flink/pull/13675


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade

2020-10-21 Thread GitBox


flinkbot commented on pull request #13737:
URL: https://github.com/apache/flink/pull/13737#issuecomment-714194471


   
   ## CI report:
   
   * 20fd494ddd03e04ffe024ee24e4a31941950f787 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13711: [FLINK-19721] [flink-runtime] Support exponential backoff retries in RpcGatewayRetriever

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13711:
URL: https://github.com/apache/flink/pull/13711#issuecomment-713051309


   
   ## CI report:
   
   * 6c8fb272a1859f797ccd5cf62cb9392d9b287e9b Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7963)
 
   * 1f877b3d8220d0896d3ae433e20a855ae1481411 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8059)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-19756) Use multi-input optimization by default

2020-10-21 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-19756:
---

 Summary: Use multi-input optimization by default
 Key: FLINK-19756
 URL: https://issues.apache.org/jira/browse/FLINK-19756
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Caizhi Weng
 Fix For: 1.12.0


After the multiple input operator is introduced we should use this optimization 
by default. This will affect a large amount of plan tests so we will do this in 
an independent subtask.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade

2020-10-21 Thread GitBox


flinkbot commented on pull request #13737:
URL: https://github.com/apache/flink/pull/13737#issuecomment-714191751


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 20fd494ddd03e04ffe024ee24e4a31941950f787 (Thu Oct 22 
03:06:39 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] TsReaper opened a new pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade

2020-10-21 Thread GitBox


TsReaper opened a new pull request #13737:
URL: https://github.com/apache/flink/pull/13737


   ## What is the purpose of the change
   
   This commit fixes the plan test failure in `DeadlockBreakupTest` due to 
Calcite upgrade.
   
   ## Brief change log
   
   - Fix tests failure in `DeadlockBreakupTest`.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type

2020-10-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218707#comment-17218707
 ] 

Jark Wu commented on FLINK-19629:
-

Fixed in
 - master: 3c661074b2a597db312ebaf6734c58eb66464ba4
 - release-1.11: TODO

> Avro format cause NullPointException,as null value in MAP type's  value type
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> create table tableA (
>   name  STRING,
>   hobly MAP,
>   phone STRING
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'ShizcTest',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'ShizcTest',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'avro'
> );
> if hobly have an null value like this:
> {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"}
> cause an NullPointException:
> {code:java}
> java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148)
>   ... 8 common frames omitted
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19629) Fix NullPointException when deserializing map field with null value for Avro format

2020-10-21 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-19629:

Summary:  Fix NullPointException when deserializing map field with null 
value for Avro format  (was: Avro format cause NullPointException,as null value 
in MAP type's  value type)

>  Fix NullPointException when deserializing map field with null value for Avro 
> format
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> create table tableA (
>   name  STRING,
>   hobly MAP,
>   phone STRING
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'ShizcTest',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'ShizcTest',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'avro'
> );
> if hobly have an null value like this:
> {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"}
> cause an NullPointException:
> {code:java}
> java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148)
>   ... 8 common frames omitted
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type

2020-10-21 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-19629:

Fix Version/s: 1.11.3

> Avro format cause NullPointException,as null value in MAP type's  value type
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> create table tableA (
>   name  STRING,
>   hobly MAP,
>   phone STRING
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'ShizcTest',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'ShizcTest',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'avro'
> );
> if hobly have an null value like this:
> {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"}
> cause an NullPointException:
> {code:java}
> java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148)
>   ... 8 common frames omitted
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] shizhengchao commented on pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type

2020-10-21 Thread GitBox


shizhengchao commented on pull request #13634:
URL: https://github.com/apache/flink/pull/13634#issuecomment-714188821


   > LGTM.
   > 
   > Could you also prepare a pull request for release-1.11 branch?
   
   Ok, i will finish as soon as possible



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong merged pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type

2020-10-21 Thread GitBox


wuchong merged pull request #13634:
URL: https://github.com/apache/flink/pull/13634


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13736: [FLINK-19654][python][e2e] Reduce pyflink e2e test parallelism

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13736:
URL: https://github.com/apache/flink/pull/13736#issuecomment-714183394


   
   ## CI report:
   
   * 259c893dcb4b5692df54cfe7739f95d9e34096e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8056)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shizhengchao commented on a change in pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type

2020-10-21 Thread GitBox


shizhengchao commented on a change in pull request #13634:
URL: https://github.com/apache/flink/pull/13634#discussion_r509848989



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
##
@@ -178,7 +178,7 @@ private static AvroToRowDataConverter 
createArrayConverter(ArrayType arrayType)
 
private static AvroToRowDataConverter createMapConverter(LogicalType 
type) {
final AvroToRowDataConverter keyConverter = 
createConverter(DataTypes.STRING().getLogicalType());

Review comment:
   > Avro can't serialize null key for map.
   
   That's right, and this problem only exists when the type of map entry is 
`STRING()`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-21 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r509848812



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+   private final FlinkKubeClient kubeClient;
+
+   private final Executor executor;
+
+   private final String configMapName;
+
+   private final KubernetesLeaderElector leaderElector;
+
+   private KubernetesWatch kubernetesWatch;
+
+   // Labels will be used to clean up the ha related ConfigMaps.
+   private Map configMapLabels;
+
+   KubernetesLeaderElectionService(
+   FlinkKubeClient kubeClient,
+   Executor executor,
+   KubernetesLeaderElectionConfiguration leaderConfig) {
+
+   this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+   this.executor = checkNotNull(executor, "Executor should not be 
null.");
+   this.configMapName = leaderConfig.getConfigMapName();
+   this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+   this.leaderContender = null;
+   this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+   leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+   }
+
+   @Override
+   public void internalStart(LeaderContender contender) {
+   CompletableFuture.runAsync(leaderElector::run, executor);
+   kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+   }
+
+   @Override
+   public void internalStop() {
+   if (kubernetesWatch != null) {
+   kubernetesWatch.close();
+   }
+   }
+
+   @Override
+   protected void writeLeaderInformation() {
+   try {
+   kubeClient.checkAndUpdateConfigMap(
+   configMapName,
+   configMap -> {
+   if 
(leaderElector.hasLeadership(configMap)) {
+   // Get the updated ConfigMap 
with new leader information
+   

[GitHub] [flink] flinkbot edited a comment on pull request #13711: [FLINK-19721] [flink-runtime] Support exponential backoff retries in RpcGatewayRetriever

2020-10-21 Thread GitBox


flinkbot edited a comment on pull request #13711:
URL: https://github.com/apache/flink/pull/13711#issuecomment-713051309


   
   ## CI report:
   
   * 6c8fb272a1859f797ccd5cf62cb9392d9b287e9b Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7963)
 
   * 1f877b3d8220d0896d3ae433e20a855ae1481411 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type

2020-10-21 Thread GitBox


wuchong commented on pull request #13634:
URL: https://github.com/apache/flink/pull/13634#issuecomment-714187780


   The failed e2e test is timeout. This pull request shoudn't affect the e2e 
tests.
   
   Merging...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.

2020-10-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218704#comment-17218704
 ] 

Jark Wu commented on FLINK-19754:
-

{{stmtSet.execute()}} already submit the job, you don't need and shouldn't call 
{{env.execute(requestPrm.getString("xxx"))}} again.

> Cannot have more than one execute() or executeAsync() call in a single 
> environment.
> ---
>
> Key: FLINK-19754
> URL: https://issues.apache.org/jira/browse/FLINK-19754
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: little-tomato
>Priority: Major
>
> I run this code on my Standalone Cluster. When i submit the job,the error log 
> is as follows:
> {code}
> 2020-10-20 11:53:42,969 WARN 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
> Could not execute application: 
>  org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Cannot have more than one execute() or executeAsync() call 
> in a single environment.
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>  [?:1.8.0_221]
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_221]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_221]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_221]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>  Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more 
> than one execute() or executeAsync() call in a single environment.
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>  at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) 
> ~[?:?]
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_221]
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_221]
>  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>  ~[flink-clients_2.12-1.11.0.jar:1.11.0]
> {code}
> my code is:
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
>  ...
>  FlinkKafkaConsumer myConsumer = new 
> FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), 
> properties);
>  myConsumer.setStartFromLatest();
> DataStream kafkaDataStream = env.addSource(myConsumer);
> SingleOutputStreamOperator 

[jira] [Updated] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.

2020-10-21 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-19754:

Description: 
I run this code on my Standalone Cluster. When i submit the job,the error log 
is as follows:

{code}
2020-10-20 11:53:42,969 WARN 
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application: 
 org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot have more than one execute() or executeAsync() call in 
a single environment.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 [?:1.8.0_221]
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_221]
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_221]
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_221]
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_221]
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_221]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
 Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than 
one execute() or executeAsync() call in a single environment.
 at 
org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
 at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) 
~[?:?]
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_221]
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_221]
 at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
{code}

my code is:

{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
 ...
 FlinkKafkaConsumer myConsumer = new 
FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), properties);
 myConsumer.setStartFromLatest();

DataStream kafkaDataStream = env.addSource(myConsumer);

SingleOutputStreamOperator sourceStream = kafkaDataStream
 .map(new MapFunction()

{ ... }

);

DataStream dataStreamRow = sourceStream.map(new 
MyMapFunction()).filter(new RuleDataProccessFunction()).map(new 
MapFunction()

{ private static final long serialVersionUID = 1L; @Override public Row 
map(MessageInfo value) throws Exception \\{ ... }

}).returns(new RowTypeInfo(rowTypeArr, fieldArr));

tEnv.registerFunction("test",new TestFunction());
 Table table = tEnv.fromDataStream(dataStreamRow, fieldStr);
 tEnv.createTemporaryView("mytable", table);

String ddl = "CREATE TABLE user_log_1155 ...from kafka topic:user_log_1155";
 tEnv.executeSql(ddl);

String ddl1 = "CREATE TABLE user_test_1155 ...from kafka topic:user_test_1155";
 tEnv.executeSql(ddl);

StatementSet stmtSet = tEnv.createStatementSet();
 

[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-21 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r509847440



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed 
coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+
+   protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+   protected final Object lock = new Object();
+
+   /** The leader contender which applies for leadership. */
+   protected volatile LeaderContender leaderContender;
+
+   private volatile UUID issuedLeaderSessionID;
+
+   protected volatile UUID confirmedLeaderSessionID;
+
+   protected volatile String confirmedLeaderAddress;
+
+   protected volatile boolean running;
+
+   protected AbstractLeaderElectionService() {
+   leaderContender = null;
+
+   issuedLeaderSessionID = null;
+   confirmedLeaderSessionID = null;
+   confirmedLeaderAddress = null;
+
+   running = false;
+   }
+
+   @Override
+   public final void start(LeaderContender contender) throws Exception {
+   Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+   Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+   logger.info("Starting LeaderElectionService {}.", this);
+
+   synchronized (lock) {
+   leaderContender = contender;
+   running = true;
+   internalStart(contender);
+   }
+   }
+
+   @Override
+   public final void stop() throws Exception {
+   synchronized (lock) {
+   if (!running) {
+   return;
+   }
+   running = false;
+   clearConfirmedLeaderInformation();
+   }
+
+   logger.info("Stopping LeaderElectionService {}.", this);
+
+   internalStop();
+   }
+
+   @Override
+   public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+   if (logger.isDebugEnabled()) {
+   logger.debug(
+   "Confirm leader session ID {} for leader {}.",
+   leaderSessionID,
+   leaderAddress);
+   }
+
+   Preconditions.checkNotNull(leaderSessionID);
+
+   if (checkLeaderLatch()) {

Review comment:
   This will be done in the composition refactor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danbosnichill commented on pull request #13711: [FLINK-19721] [flink-runtime] Support exponential backoff retries in RpcGatewayRetriever

2020-10-21 Thread GitBox


danbosnichill commented on pull request #13711:
URL: https://github.com/apache/flink/pull/13711#issuecomment-714186065


   I switched to `java.time.Duration`.
   
   I did not switch other calls to `ExponentialBackoffRetryStrategy`.  I do not 
know the code base well enough to say if this is worth doing.  E.g. what are 
safe minimums?  What frequency and backoff makes sense?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type

2020-10-21 Thread GitBox


wuchong commented on a change in pull request #13634:
URL: https://github.com/apache/flink/pull/13634#discussion_r509846367



##
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
##
@@ -178,7 +178,7 @@ private static AvroToRowDataConverter 
createArrayConverter(ArrayType arrayType)
 
private static AvroToRowDataConverter createMapConverter(LogicalType 
type) {
final AvroToRowDataConverter keyConverter = 
createConverter(DataTypes.STRING().getLogicalType());

Review comment:
   Avro can't serialize null key for map. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   >