[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API
flinkbot edited a comment on pull request #13678: URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155 ## CI report: * Unknown: [CANCELED](TBD) * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gm7y8 commented on pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI
gm7y8 commented on pull request #13458: URL: https://github.com/apache/flink/pull/13458#issuecomment-714249342 > > > @XComp I was able to identify the code fix. I working to unit test it had some issue with Flink set up to start a job with checkpoint and savepoint. it might take a day or so as I am doing it for the first time. > > > > > > Hi @gm7y8 , thanks for getting back to us. Let's see if we can finish this before the codefreeze next week. Here a few remarks: > > > > * The build is failing for me locally (`mvn -pl flink-runtime-web -Dfast -DskipTests install`) with the following error message: > > > > ``` > > [ERROR] ERROR in src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38): : Element implicitly has an 'any' type because type 'CheckPointDetailInterface' has no index signature. > > [ERROR] src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38): : Property 'checkpoint_type' does not exist on type 'JobCheckpointsDetailComponent'. > > [ERROR] src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38): : Property 'checkPointConfig' does not exist on type 'JobCheckpointsDetailComponent'. > > [ERROR] src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38): : Element implicitly has an 'any' type because type 'CheckPointDetailInterface' has no index signature. > > [ERROR] src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38): : Property 'checkpoint_type' does not exist on type 'JobCheckpointsDetailComponent'. > > [ERROR] src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38): : Element implicitly has an 'any' type because type 'CheckPointDetailInterface' has no index signature. > > [ERROR] src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html(25,38): : Property 'checkpoint_type' does not exist on type 'JobCheckpointsDetailComponent'. > > ``` > > > > > > It's always good run a final `mvn install` at least on the modules you touched to verify your changes. Alternatively (or rather as an additional tool), you can setup Azure CI on your fork. That will run a full test covering all modules for each commit on your fork. Check out the [tutorial for setting up Azure CI](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository). > > > > * The changes like [job-checkpoints-detail.component.html](https://github.com/apache/flink/pull/13458/commits/0b8e369aa5c8c4e4a8c370b4b848c0f197fcdb9d#diff-a1c50fd814fe8a1c03567f9c71877873251223cfaf32c7067068f7f30d8c5c42R25) does not reflect @AHeise's request as you use uppercased tokens here. @AHeise suggested normal casing (i.e. lowercase) as it helps to improve the readability. > > * Additionally, I realized that you didn't address all points of the PR description template. May you fix that? > > > > Thanks. > > Sure, I fixed the build issue u mentioned, and the build has passed now. Also will change the letter to lowercase. Also updated the description to reflect PR guidelines.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.
[ https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218776#comment-17218776 ] little-tomato commented on FLINK-19754: --- Source: Custom Source -> Map -> Map -> Filter -> Map -> SourceConversion(table=[default_catalog.default_database.ruleengine], fields=[product_key, productName, device_key, deviceName, alarmControl, geoLocation, currentTemperature, airSpeed, workMode, setTemperature, powerSwitch, batteryLevel, alarmCondition, operatSwitch, lightVoltage, lightCurrent, lightIllumination, powerConsumption, tiltValue, lightStatus, originalData]) -> (Calc(select=[productName]) -> SinkConversionToRow -> Sink: KafkaTableSink(msg), Calc(select=[deviceName]) -> SinkConversionToRow -> Sink: KafkaTableSink(msg)) > Cannot have more than one execute() or executeAsync() call in a single > environment. > --- > > Key: FLINK-19754 > URL: https://issues.apache.org/jira/browse/FLINK-19754 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 >Reporter: little-tomato >Priority: Major > > I run this code on my Standalone Cluster. When i submit the job,the error log > is as follows: > {code} > 2020-10-20 11:53:42,969 WARN > org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - > Could not execute application: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Cannot have more than one execute() or executeAsync() call > in a single environment. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > [?:1.8.0_221] > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_221] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_221] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] > Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more > than one execute() or executeAsync() call in a single environment. > at > org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) > ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_221] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_221] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > {code} > my code is: > {code:java} > final StreamExecutionEnvironment env = >
[jira] [Comment Edited] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
[ https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218765#comment-17218765 ] Dian Fu edited comment on FLINK-19759 at 10/22/20, 5:54 AM: cc [~TsReaper] [~godfreyhe] was (Author: dian.fu): cc [~TsReaper] > DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable > - > > Key: FLINK-19759 > URL: https://issues.apache.org/jira/browse/FLINK-19759 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Blocker > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29 > {code} > [ERROR] DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 > planAfter expected:<...=[>(cnt, 3)]) > : +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)]) > : +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalHash]Aggregate(select=[Pa...> > [INFO] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
[ https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218772#comment-17218772 ] Dian Fu commented on FLINK-19759: - Upgrade to blocker as it's continuously failing. > DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable > - > > Key: FLINK-19759 > URL: https://issues.apache.org/jira/browse/FLINK-19759 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Blocker > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29 > {code} > [ERROR] DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 > planAfter expected:<...=[>(cnt, 3)]) > : +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)]) > : +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalHash]Aggregate(select=[Pa...> > [INFO] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
[ https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19759: Priority: Blocker (was: Major) > DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable > - > > Key: FLINK-19759 > URL: https://issues.apache.org/jira/browse/FLINK-19759 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Blocker > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29 > {code} > [ERROR] DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 > planAfter expected:<...=[>(cnt, 3)]) > : +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)]) > : +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalHash]Aggregate(select=[Pa...> > [INFO] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
[ https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218771#comment-17218771 ] Dian Fu commented on FLINK-19759: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8053=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29 > DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable > - > > Key: FLINK-19759 > URL: https://issues.apache.org/jira/browse/FLINK-19759 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29 > {code} > [ERROR] DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 > planAfter expected:<...=[>(cnt, 3)]) > : +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)]) > : +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalHash]Aggregate(select=[Pa...> > [INFO] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
[ https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218765#comment-17218765 ] Dian Fu commented on FLINK-19759: - cc [~TsReaper] > DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable > - > > Key: FLINK-19759 > URL: https://issues.apache.org/jira/browse/FLINK-19759 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29 > {code} > [ERROR] DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 > planAfter expected:<...=[>(cnt, 3)]) > : +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)]) > : +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalHash]Aggregate(select=[Pa...> > [INFO] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
Dian Fu created FLINK-19759: --- Summary: DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable Key: FLINK-19759 URL: https://issues.apache.org/jira/browse/FLINK-19759 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.0 Reporter: Dian Fu Fix For: 1.12.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29 {code} [ERROR] DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 planAfter expected:<...=[>(cnt, 3)]) : +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], reuse_id=[1]) : +- Exchange(distribution=[single]) :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)]) : +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], reuse_id=[1]) : +- Exchange(distribution=[single]) :+- LocalHash]Aggregate(select=[Pa...> [INFO] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
[ https://issues.apache.org/jira/browse/FLINK-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19759: Labels: test-stability (was: ) > DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable > - > > Key: FLINK-19759 > URL: https://issues.apache.org/jira/browse/FLINK-19759 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29 > {code} > [ERROR] DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 > planAfter expected:<...=[>(cnt, 3)]) > : +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)]) > : +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], > reuse_id=[1]) > : +- Exchange(distribution=[single]) > :+- LocalHash]Aggregate(select=[Pa...> > [INFO] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] guoweiM commented on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API
guoweiM commented on pull request #13678: URL: https://github.com/apache/flink/pull/13678#issuecomment-714246544 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe merged pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade
godfreyhe merged pull request #13737: URL: https://github.com/apache/flink/pull/13737 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19757) TimeStampData can cause time inconsistent problem
[ https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218759#comment-17218759 ] xiaogang zhou commented on FLINK-19757: --- [~jark] thx for commenting, if i run the sql tEnv.executeSql("CREATE TABLE Source (\n" + " -- declare the schema of the table\n" + " `name` STRING,\n" + " `num` INT,\n" + " `xtime` as proctime()\n"+ ") WITH (\n" + " -- declare the external system to connect to\n" + " 'connector' = 'bsql-datagen',\n" + " 'rows-per-second' = '1'\n" + ")"); if i call the stream.print() , i will get a time 8 hrs ago(as my default time zone is +8). how can i fix this problem? furthermore, if i print Timestamp ts = new Timestamp(System.currentTimeMillis()); System.err.println(TimestampData.fromTimestamp(ts)); //2020-10-22T13:40:28.596 System.err.println(TimestampData.fromEpochMillis(System.currentTimeMillis())); //2020-10-22T05:40:28.724 this is pretty strange, can you please suggest in which way i can get the correct time? thx for your time > TimeStampData can cause time inconsistent problem > - > > Key: FLINK-19757 > URL: https://issues.apache.org/jira/browse/FLINK-19757 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.11.1 >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > when we check jdk LocalDateTime code,we find that > > {code:java} > // code placeholder > public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, > ZoneOffset offset) { > Objects.requireNonNull(offset, "offset"); > NANO_OF_SECOND.checkValidValue(nanoOfSecond); > long localSecond = epochSecond + offset.getTotalSeconds(); // overflow > caught later > long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); > int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); > LocalDate date = LocalDate.ofEpochDay(localEpochDay); > LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + > nanoOfSecond); > return new LocalDateTime(date, time); > } > {code} > > offset.getTotalSeconds() they add the offset, but in the TimeStampData > toLocalDateTime, we don't add a offset. > > I'd like to add a TimeZone.getDefault().getRawOffset() in the > toLocalDateTime() > and minus a TimeZone.getDefault().getRawOffset() in the > fromLocalDateTime -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] KarmaGYZ commented on pull request #13581: [FLINK-17331] Explicitly get the ByteBuf length of all classes which …
KarmaGYZ commented on pull request #13581: URL: https://github.com/apache/flink/pull/13581#issuecomment-714242065 cc @zhijiangW This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r509891311 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.resources; + +import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration; + +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +/** + * Represent {@link KubernetesLeaderElector} in kubernetes. {@link LeaderElector#run()} is a blocking call. It should be + * run in the IO executor, not the main thread. The lifecycle is bound to single leader election. Once the leadership + * is revoked, as well as the {@link LeaderCallbackHandler#notLeader()} is called, the {@link LeaderElector#run()} will + * finish. To start another round of election, we need to trigger again. + */ +public class KubernetesLeaderElector extends LeaderElector { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElector.class); + protected static final String LOCK_IDENTITY = UUID.randomUUID().toString(); Review comment: Maybe I am not making myself clear. The `lockIdentity` will be a non-static field in `KubernetesHaServices` after refactor, which means `KubernetesHaServices` will have a dedicated lock identity for all the components above. But different instances will have different identities. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…
flinkbot edited a comment on pull request #13690: URL: https://github.com/apache/flink/pull/13690#issuecomment-712304240 ## CI report: * 518581ab52a2d976ff344404a6828cec23c260f9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8054) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8039) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13739: [FLINK-19232][python] support iterating MapState and MapView
flinkbot edited a comment on pull request #13739: URL: https://github.com/apache/flink/pull/13739#issuecomment-714232634 ## CI report: * 9f75014b48308eb93937e11fcd8f4da80de12bd4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8074) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r50975 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.util.function.FunctionUtils; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; +import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY; +import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Leader election service for multiple JobManagers. The active JobManager is elected using Kubernetes. + * The current leader's address as well as its leader session ID is published via Kubernetes ConfigMap. + * Note that the contending lock and leader storage are using the same ConfigMap. And every component(e.g. + * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will have a separate ConfigMap. + */ +public class KubernetesLeaderElectionService extends AbstractLeaderElectionService { + + private final FlinkKubeClient kubeClient; + + private final Executor executor; + + private final String configMapName; + + private final KubernetesLeaderElector leaderElector; + + private KubernetesWatch kubernetesWatch; + + // Labels will be used to clean up the ha related ConfigMaps. + private Map configMapLabels; + + KubernetesLeaderElectionService( + FlinkKubeClient kubeClient, + Executor executor, + KubernetesLeaderElectionConfiguration leaderConfig) { + + this.kubeClient = checkNotNull(kubeClient, "Kubernetes client should not be null."); + this.executor = checkNotNull(executor, "Executor should not be null."); + this.configMapName = leaderConfig.getConfigMapName(); + this.leaderElector = kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl()); + this.leaderContender = null; + this.configMapLabels = KubernetesUtils.getConfigMapLabels( + leaderConfig.getClusterId(), LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY); + } + + @Override + public void internalStart(LeaderContender contender) { + CompletableFuture.runAsync(leaderElector::run, executor); + kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new ConfigMapCallbackHandlerImpl()); + } + + @Override + public void internalStop() { + if (kubernetesWatch != null) { + kubernetesWatch.close(); + } + } + + @Override + protected void writeLeaderInformation() { + try { + kubeClient.checkAndUpdateConfigMap( + configMapName, + configMap -> { + if (leaderElector.hasLeadership(configMap)) { + // Get the updated ConfigMap with new leader information +
[GitHub] [flink] gm7y8 removed a comment on pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI
gm7y8 removed a comment on pull request #13458: URL: https://github.com/apache/flink/pull/13458#issuecomment-714126117 @flinkbot approve description This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19743) Add Source metrics definitions
[ https://issues.apache.org/jira/browse/FLINK-19743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated FLINK-19743: - Summary: Add Source metrics definitions (was: Add metrics definitions.) > Add Source metrics definitions > -- > > Key: FLINK-19743 > URL: https://issues.apache.org/jira/browse/FLINK-19743 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Affects Versions: 1.11.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > > Add the metrics defined in > [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics] > to \{{OperatorMetricsGroup}} and {{SourceReaderContext}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? Aren't they guarded by the synchronization block? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. I am actually having an idea to simplify this whole model: **If we only release before creation and no other places, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? Aren't they guarded by the synchronization block? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. I am actually having an idea to simplify this whole model: **If we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13739: [FLINK-19232][python] support iterating MapState and MapView
flinkbot commented on pull request #13739: URL: https://github.com/apache/flink/pull/13739#issuecomment-714232634 ## CI report: * 9f75014b48308eb93937e11fcd8f4da80de12bd4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13738: [FLINK-19757][flink-table-common] fix the local datetime inconsistent…
flinkbot edited a comment on pull request #13738: URL: https://github.com/apache/flink/pull/13738#issuecomment-714225386 ## CI report: * 065898f3ea61da7479e614517c03acfbce10ebfb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8073) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13721: [FLINK-19694][table] Support Upsert ChangelogMode for ScanTableSource
flinkbot edited a comment on pull request #13721: URL: https://github.com/apache/flink/pull/13721#issuecomment-713489063 ## CI report: * 1e1971368a88e3457dbf60cecfe319d58649092d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8025) * fa0c87c737cee61fe6fd5b6655916eb97d18aaeb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8072) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-19758) Implement a new unified File Sink based on the new Sink API
Yun Gao created FLINK-19758: --- Summary: Implement a new unified File Sink based on the new Sink API Key: FLINK-19758 URL: https://issues.apache.org/jira/browse/FLINK-19758 Project: Flink Issue Type: Sub-task Components: API / DataStream, Connectors / FileSystem Affects Versions: 1.12.0 Reporter: Yun Gao Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? Aren't they guarded by the synchronization block? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. I am actually having an idea to simplify this whole model: **If we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view** I couldn't see potential risks we can not do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? Aren't they guarded by the synchronization block? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. **And if we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view** I couldn't see potential risks we can not do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13739: [FLINK-19232][python] support iterating MapState and MapView
flinkbot commented on pull request #13739: URL: https://github.com/apache/flink/pull/13739#issuecomment-714227770 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9f75014b48308eb93937e11fcd8f4da80de12bd4 (Thu Oct 22 05:01:49 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WeiZhong94 opened a new pull request #13739: [FLINK-19232][python] support iterating MapState and MapView
WeiZhong94 opened a new pull request #13739: URL: https://github.com/apache/flink/pull/13739 ## What is the purpose of the change *This pull request supports iterating MapState and MapView* ## Brief change log - *Support iterating MapState and MapView* ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13738: [FLINK-19757][flink-table-common] fix the local datetime inconsistent…
flinkbot commented on pull request #13738: URL: https://github.com/apache/flink/pull/13738#issuecomment-714225386 ## CI report: * 065898f3ea61da7479e614517c03acfbce10ebfb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join
flinkbot edited a comment on pull request #13729: URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157 ## CI report: * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13736: [FLINK-19654][python][e2e] Reduce pyflink e2e test parallelism
flinkbot edited a comment on pull request #13736: URL: https://github.com/apache/flink/pull/13736#issuecomment-714183394 ## CI report: * 259c893dcb4b5692df54cfe7739f95d9e34096e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8056) * 111ec785929a0742b46ea98408f585aa03314b1d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8070) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13721: [FLINK-19694][table] Support Upsert ChangelogMode for ScanTableSource
flinkbot edited a comment on pull request #13721: URL: https://github.com/apache/flink/pull/13721#issuecomment-713489063 ## CI report: * 1e1971368a88e3457dbf60cecfe319d58649092d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8025) * fa0c87c737cee61fe6fd5b6655916eb97d18aaeb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead
flinkbot edited a comment on pull request #13653: URL: https://github.com/apache/flink/pull/13653#issuecomment-709337488 ## CI report: * cadf9a2a13d3113395f199431881fff216b9a50a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7974) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7958) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8017) * 0c876d6befc487412629e6a1e8883fa5fbeb31e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8067) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8071) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13581: [FLINK-17331] Explicitly get the ByteBuf length of all classes which …
flinkbot edited a comment on pull request #13581: URL: https://github.com/apache/flink/pull/13581#issuecomment-706519397 ## CI report: * 1e187203b15559b08d15a0cca949ca15a8a3bafe Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7383) * fcc301fbb0d04ada457d2c39d325ede52cc0db8d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8069) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead
wuchong commented on pull request #13653: URL: https://github.com/apache/flink/pull/13653#issuecomment-714215751 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19757) TimeStampData can cause time inconsistent problem
[ https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218728#comment-17218728 ] Jark Wu commented on FLINK-19757: - Hi [~zhoujira86], the current implementation is correct. The conversion between {{LocalDateTime}} and {{TimestampData}} happens when the SQL type is {{TIMESTAMP}}. {{TIMESTAMP}} type is a value without time zone, which is the same semantic of {{LocalDateTime}}. We store such data in Flink using the epoch seconds since {{1970-01-01 00:00:00}}. Therefore, we shounldn't add time zone offsets to the method. Otherwise, the logic is wrong and tests will fail. > TimeStampData can cause time inconsistent problem > - > > Key: FLINK-19757 > URL: https://issues.apache.org/jira/browse/FLINK-19757 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.11.1 >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > when we check jdk LocalDateTime code,we find that > > {code:java} > // code placeholder > public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, > ZoneOffset offset) { > Objects.requireNonNull(offset, "offset"); > NANO_OF_SECOND.checkValidValue(nanoOfSecond); > long localSecond = epochSecond + offset.getTotalSeconds(); // overflow > caught later > long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); > int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); > LocalDate date = LocalDate.ofEpochDay(localEpochDay); > LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + > nanoOfSecond); > return new LocalDateTime(date, time); > } > {code} > > offset.getTotalSeconds() they add the offset, but in the TimeStampData > toLocalDateTime, we don't add a offset. > > I'd like to add a TimeZone.getDefault().getRawOffset() in the > toLocalDateTime() > and minus a TimeZone.getDefault().getRawOffset() in the > fromLocalDateTime -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. **And if we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view** I couldn't see potential risks we can not do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. **And if we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one netty thread can release the view** I couldn't see potential risks we can not do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. **And if we only release before creation, this whole threading interaction model would be simplified in a great way.** I couldn't see potential risks we can not do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19687) Support to get execution plan in `StatementSet`
[ https://issues.apache.org/jira/browse/FLINK-19687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218724#comment-17218724 ] xiaozilong commented on FLINK-19687: cc [~godfreyhe] > Support to get execution plan in `StatementSet` > --- > > Key: FLINK-19687 > URL: https://issues.apache.org/jira/browse/FLINK-19687 > Project: Flink > Issue Type: Wish > Components: Table SQL / API >Affects Versions: 1.11.0 >Reporter: xiaozilong >Priority: Major > > Hi, I want to get job's execution plan in Flink SQL 1.11, but i meet > exception "No operators defined in streaming topology. Cannot execute." when > use `env.getExecutionPlan()`. The same code runs fine in Flink SQL 1.10. I > found translation operations only happen when StatementSet.execute() is > called in Flink SQL 1.11. So we cannot get job's execution plan before the > job submit? Can we support to get execution plan in `StatementSet` or invoke > method `TableEnvironmentImpl#translate` in `StatementSetImpl#addInsertSql`? I > think the latter is better so that we can reuse `env.getExecutionPlan()`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. And if we only release before creation, this whole threading interaction model would be simplified in a great way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur commented on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577 > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right? > Maybe call `releaseView()` from `createReadView()` unconditionally? That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data? > Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it? I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. - What if the netty thread1 release view after netty thread2 recreates the view? Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13738: [FLINK-19757][flink-table-common] fix the local datetime inconsistent…
flinkbot commented on pull request #13738: URL: https://github.com/apache/flink/pull/13738#issuecomment-714211013 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 065898f3ea61da7479e614517c03acfbce10ebfb (Thu Oct 22 04:13:34 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-19757).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19757) TimeStampData can cause time inconsistent problem
[ https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19757: --- Labels: pull-request-available (was: ) > TimeStampData can cause time inconsistent problem > - > > Key: FLINK-19757 > URL: https://issues.apache.org/jira/browse/FLINK-19757 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.11.1 >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > when we check jdk LocalDateTime code,we find that > > {code:java} > // code placeholder > public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, > ZoneOffset offset) { > Objects.requireNonNull(offset, "offset"); > NANO_OF_SECOND.checkValidValue(nanoOfSecond); > long localSecond = epochSecond + offset.getTotalSeconds(); // overflow > caught later > long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); > int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); > LocalDate date = LocalDate.ofEpochDay(localEpochDay); > LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + > nanoOfSecond); > return new LocalDateTime(date, time); > } > {code} > > offset.getTotalSeconds() they add the offset, but in the TimeStampData > toLocalDateTime, we don't add a offset. > > I'd like to add a TimeZone.getDefault().getRawOffset() in the > toLocalDateTime() > and minus a TimeZone.getDefault().getRawOffset() in the > fromLocalDateTime -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13736: [FLINK-19654][python][e2e] Reduce pyflink e2e test parallelism
flinkbot edited a comment on pull request #13736: URL: https://github.com/apache/flink/pull/13736#issuecomment-714183394 ## CI report: * 259c893dcb4b5692df54cfe7739f95d9e34096e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8056) * 111ec785929a0742b46ea98408f585aa03314b1d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhougit86 opened a new pull request #13738: [FLINK-19757][flink-table-common] fix the local datetime inconsistent…
zhougit86 opened a new pull request #13738: URL: https://github.com/apache/flink/pull/13738 … problem ## What is the purpose of the change fix the timestampdata inconsistent problem ## Brief change log add offset in toLocalDateTime minus offset in fromLocalDateTime ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join
flinkbot edited a comment on pull request #13729: URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157 ## CI report: * 659b57b42486d3b79197653fec933ce42766388e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033) * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8068) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13623: [FLINK-19606][table-runtime] Implement streaming window join operator
flinkbot edited a comment on pull request #13623: URL: https://github.com/apache/flink/pull/13623#issuecomment-708132144 ## CI report: * 1de96f52dee7fc493ca0cad827189955520362cc Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8066) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead
flinkbot edited a comment on pull request #13653: URL: https://github.com/apache/flink/pull/13653#issuecomment-709337488 ## CI report: * cadf9a2a13d3113395f199431881fff216b9a50a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7974) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7958) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8017) * 0c876d6befc487412629e6a1e8883fa5fbeb31e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8067) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13581: [FLINK-17331] Explicitly get the ByteBuf length of all classes which …
flinkbot edited a comment on pull request #13581: URL: https://github.com/apache/flink/pull/13581#issuecomment-706519397 ## CI report: * 1e187203b15559b08d15a0cca949ca15a8a3bafe Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7383) * fcc301fbb0d04ada457d2c39d325ede52cc0db8d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19757) TimeStampData can cause time inconsistent problem
[ https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-19757: -- Description: when we check jdk LocalDateTime code,we find that {code:java} // code placeholder public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset offset) { Objects.requireNonNull(offset, "offset"); NANO_OF_SECOND.checkValidValue(nanoOfSecond); long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught later long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); LocalDate date = LocalDate.ofEpochDay(localEpochDay); LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + nanoOfSecond); return new LocalDateTime(date, time); } {code} offset.getTotalSeconds() they add the offset, but in the TimeStampData toLocalDateTime, we don't add a offset. I'd like to add a TimeZone.getDefault().getRawOffset() in the toLocalDateTime() and minus a TimeZone.getDefault().getRawOffset() in the fromLocalDateTime was: when we check jdk LocalDateTime code,we find that public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset offset) { Objects.requireNonNull(offset, "offset"); NANO_OF_SECOND.checkValidValue(nanoOfSecond); long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught later long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); LocalDate date = LocalDate.ofEpochDay(localEpochDay); LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + nanoOfSecond); return new LocalDateTime(date, time); } offset.getTotalSeconds() they add the offset, but in the TimeStampData toLocalDateTime, we don't add a offset. I'd like to add a TimeZone.getDefault().getRawOffset() in the toLocalDateTime() and minus a TimeZone.getDefault().getRawOffset() in the fromLocalDateTime > TimeStampData can cause time inconsistent problem > - > > Key: FLINK-19757 > URL: https://issues.apache.org/jira/browse/FLINK-19757 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.11.1 >Reporter: xiaogang zhou >Priority: Major > > when we check jdk LocalDateTime code,we find that > > {code:java} > // code placeholder > public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, > ZoneOffset offset) { > Objects.requireNonNull(offset, "offset"); > NANO_OF_SECOND.checkValidValue(nanoOfSecond); > long localSecond = epochSecond + offset.getTotalSeconds(); // overflow > caught later > long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); > int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); > LocalDate date = LocalDate.ofEpochDay(localEpochDay); > LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + > nanoOfSecond); > return new LocalDateTime(date, time); > } > {code} > > offset.getTotalSeconds() they add the offset, but in the TimeStampData > toLocalDateTime, we don't add a offset. > > I'd like to add a TimeZone.getDefault().getRawOffset() in the > toLocalDateTime() > and minus a TimeZone.getDefault().getRawOffset() in the > fromLocalDateTime -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19757) TimeStampData can cause time inconsistent problem
[ https://issues.apache.org/jira/browse/FLINK-19757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-19757: -- Description: when we check jdk LocalDateTime code,we find that public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset offset) { Objects.requireNonNull(offset, "offset"); NANO_OF_SECOND.checkValidValue(nanoOfSecond); long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught later long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); LocalDate date = LocalDate.ofEpochDay(localEpochDay); LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + nanoOfSecond); return new LocalDateTime(date, time); } offset.getTotalSeconds() they add the offset, but in the TimeStampData toLocalDateTime, we don't add a offset. I'd like to add a TimeZone.getDefault().getRawOffset() in the toLocalDateTime() and minus a TimeZone.getDefault().getRawOffset() in the fromLocalDateTime was: when we check jdk LocalDateTime code,we find that public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset offset) { Objects.requireNonNull(offset, "offset"); NANO_OF_SECOND.checkValidValue(nanoOfSecond); long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught later long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); LocalDate date = LocalDate.ofEpochDay(localEpochDay); LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + nanoOfSecond); return new LocalDateTime(date, time); } offset.getTotalSeconds() they add the offset, but in the TimeStampData toLocalDateTime, we don't add a offset. I'd like to add a TimeZone.getDefault().getRawOffset() in the toLocalDateTime() and minus a TimeZone.getDefault().getRawOffset() in the fromLocalDateTime > TimeStampData can cause time inconsistent problem > - > > Key: FLINK-19757 > URL: https://issues.apache.org/jira/browse/FLINK-19757 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.11.1 >Reporter: xiaogang zhou >Priority: Major > > when we check jdk LocalDateTime code,we find that > public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, > ZoneOffset offset) > { > Objects.requireNonNull(offset, "offset"); > NANO_OF_SECOND.checkValidValue(nanoOfSecond); > long localSecond = epochSecond + offset.getTotalSeconds(); > // overflow caught later long localEpochDay = Math.floorDiv(localSecond, > SECONDS_PER_DAY); > int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); > LocalDate date = LocalDate.ofEpochDay(localEpochDay); > LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + > nanoOfSecond); > return new LocalDateTime(date, time); } > > offset.getTotalSeconds() they add the offset, but in the TimeStampData > toLocalDateTime, we don't add a offset. > > I'd like to add a TimeZone.getDefault().getRawOffset() in the > toLocalDateTime() > and minus a TimeZone.getDefault().getRawOffset() in the > fromLocalDateTime -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19757) TimeStampData can cause time inconsistent problem
xiaogang zhou created FLINK-19757: - Summary: TimeStampData can cause time inconsistent problem Key: FLINK-19757 URL: https://issues.apache.org/jira/browse/FLINK-19757 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.11.1 Reporter: xiaogang zhou when we check jdk LocalDateTime code,we find that public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset offset) { Objects.requireNonNull(offset, "offset"); NANO_OF_SECOND.checkValidValue(nanoOfSecond); long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught later long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); LocalDate date = LocalDate.ofEpochDay(localEpochDay); LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + nanoOfSecond); return new LocalDateTime(date, time); } offset.getTotalSeconds() they add the offset, but in the TimeStampData toLocalDateTime, we don't add a offset. I'd like to add a TimeZone.getDefault().getRawOffset() in the toLocalDateTime() and minus a TimeZone.getDefault().getRawOffset() in the fromLocalDateTime -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581 Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights. I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks! For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader. Reader belongs to PartitionRequestServerHandler. `PipelinedApproximateSubpartition#releaseView` is used in two places: 1. PipelinedApproximateSubpartition#createView (From Netty thread, it can see the current version of view belonging to the subpartition) 2. PipelinedApproximateSubpartition#releaseAllResources, This is called from two places, - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. - one from `subPartition release`. This is fine, the subpartition releases its own view. Two questions: 1. Is the old view continue to read data if not disposed of successfully before the new view is created? - No, because the reader and the view would be removed upon downstream task's cancelation request; 2. What if the netty thread1 release view after netty thread2 recreates the view? - Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581 Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights. I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks! For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader. Reader belongs to PartitionRequestServerHandler. `PipelinedApproximateSubpartition#releaseView` is used in two places: 1. PipelinedApproximateSubpartition#createView (From Netty thread, it can see the current version of view belonging to the subpartition) 2. PipelinedApproximateSubpartition#releaseAllResources, This is called from two places, - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. - one from `subPartition release`. This is fine, the subpartition releases its own view. Two questions: 1. Is the old view continue to read data if not disposed of successfully before the new view is created? - No, because the reader and the view would be removed upon downstream task's cancelation request; 2. What if the netty thread1 release view after netty thread2 recreates the view? - Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the view (through view reference) again afterwards, since a view can only be released once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.
[ https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218721#comment-17218721 ] Jark Wu commented on FLINK-19754: - Could you share the job graph in Web UI? > Cannot have more than one execute() or executeAsync() call in a single > environment. > --- > > Key: FLINK-19754 > URL: https://issues.apache.org/jira/browse/FLINK-19754 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 >Reporter: little-tomato >Priority: Major > > I run this code on my Standalone Cluster. When i submit the job,the error log > is as follows: > {code} > 2020-10-20 11:53:42,969 WARN > org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - > Could not execute application: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Cannot have more than one execute() or executeAsync() call > in a single environment. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > [?:1.8.0_221] > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_221] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_221] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] > Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more > than one execute() or executeAsync() call in a single environment. > at > org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) > ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_221] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_221] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > {code} > my code is: > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); > ... > FlinkKafkaConsumer myConsumer = new > FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), > properties); > myConsumer.setStartFromLatest(); > DataStream kafkaDataStream = env.addSource(myConsumer); > SingleOutputStreamOperator sourceStream = kafkaDataStream > .map(new MapFunction() > { ... } > ); > DataStream
[GitHub] [flink] flinkbot edited a comment on pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade
flinkbot edited a comment on pull request #13737: URL: https://github.com/apache/flink/pull/13737#issuecomment-714194471 ## CI report: * 20fd494ddd03e04ffe024ee24e4a31941950f787 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8062) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join
flinkbot edited a comment on pull request #13729: URL: https://github.com/apache/flink/pull/13729#issuecomment-713673157 ## CI report: * 659b57b42486d3b79197653fec933ce42766388e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8033) * f028c8d52466fbd6fd5a9cbf3ed85dcbc65fd2ac UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.
[ https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218719#comment-17218719 ] little-tomato commented on FLINK-19754: --- I try it,but StreamExecutionEnvironment DataStream code does not take effect. > Cannot have more than one execute() or executeAsync() call in a single > environment. > --- > > Key: FLINK-19754 > URL: https://issues.apache.org/jira/browse/FLINK-19754 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 >Reporter: little-tomato >Priority: Major > > I run this code on my Standalone Cluster. When i submit the job,the error log > is as follows: > {code} > 2020-10-20 11:53:42,969 WARN > org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - > Could not execute application: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Cannot have more than one execute() or executeAsync() call > in a single environment. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > [?:1.8.0_221] > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_221] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_221] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] > Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more > than one execute() or executeAsync() call in a single environment. > at > org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) > ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_221] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_221] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > {code} > my code is: > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); > ... > FlinkKafkaConsumer myConsumer = new > FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), > properties); > myConsumer.setStartFromLatest(); > DataStream kafkaDataStream = env.addSource(myConsumer); > SingleOutputStreamOperator sourceStream = kafkaDataStream > .map(new
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581 Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights. I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks! For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader. Reader belongs to PartitionRequestServerHandler. `PipelinedApproximateSubpartition#releaseView` is used in two places: 1. PipelinedApproximateSubpartition#createView (From Netty thread, it can see the current version of view belonging to the subpartition) 2. PipelinedApproximateSubpartition#releaseAllResources, This is called from two places, - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. - one from `subPartition release`. This is fine, the subpartition releases its own view. Two questions: 1. Is the old view continue to read data if not disposed of successfully before the new view is created? - No, because the reader and the view would be removed upon downstream task's cancelation request; 2. What if the netty thread1 release view after netty thread2 recreates the view? - Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the view again afterwards, since a view can only be released once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead
flinkbot edited a comment on pull request #13653: URL: https://github.com/apache/flink/pull/13653#issuecomment-709337488 ## CI report: * cadf9a2a13d3113395f199431881fff216b9a50a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7974) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7958) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8017) * 0c876d6befc487412629e6a1e8883fa5fbeb31e6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581 Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights. I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks! For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader. Reader belongs to PartitionRequestServerHandler. `PipelinedApproximateSubpartition#releaseView` is used in two places: 1. PipelinedApproximateSubpartition#createView (From Netty thread, it can see the current version of view belonging to the subpartition) 2. PipelinedApproximateSubpartition#releaseAllResources, This is called from two places, - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. - one from `subPartition release`. This is fine, the subpartition releases its own view. Two questions: 1. Is the old view continue to read data if not disposed of successfully before the new view is created? - No, because the reader and the view would be removed upon downstream task's cancelation request; 2. What if the netty thread1 release view after netty thread2 recreates the view? - Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the view again, since a view can only be released once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13623: [FLINK-19606][table-runtime] Implement streaming window join operator
flinkbot edited a comment on pull request #13623: URL: https://github.com/apache/flink/pull/13623#issuecomment-708132144 ## CI report: * 15cc49ee7ecb3060ec52ab5545f4b6fae58c25de Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7559) * 1de96f52dee7fc493ca0cad827189955520362cc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581 Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights. I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks! For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader. Reader belongs to PartitionRequestServerHandler. `PipelinedApproximateSubpartition#releaseView` is used in two places: 1. PipelinedApproximateSubpartition#createView (From Netty thread, it can see the current version of view belonging to the subpartition) 2. PipelinedApproximateSubpartition#releaseAllResources, This is called from two places, - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. - one from `subPartition release`. This is fine, the subpartition releases its own view. Two questions: 1. Is the old view continue to read data if not disposed of successfully before the new view is created? - No, because the reader and the view would be removed upon downstream task's cancelation request; 2. What if the netty thread1 release view after netty thread2 recreates the view? thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the view again, since a view can only be released once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join
leonardBang commented on a change in pull request #13729: URL: https://github.com/apache/flink/pull/13729#discussion_r509860215 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java ## @@ -47,29 +59,46 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_CACHE_TTL; +import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_PARTITION; +import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getPartitionByPartitionSpecs; +import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getTableProps; +import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.toHiveTablePartition; +import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.validateAndParsePartitionSpecs; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * Lookup table function for filesystem connector tables. + * Lookup table function for Hive connector tables. */ -public class FileSystemLookupFunction extends TableFunction { +public class HiveLookupFunction extends TableFunction { Review comment: The `FileSystemLookupFunction ` only used in HiveTableSource. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-18971) Support to mount kerberos conf as ConfigMap and Keytab as Secrete
[ https://issues.apache.org/jira/browse/FLINK-18971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-18971. Resolution: Done Done via * master: dd481134f24e8fd1ce65e827e0c5c6350c5be9b2 > Support to mount kerberos conf as ConfigMap and Keytab as Secrete > - > > Key: FLINK-18971 > URL: https://issues.apache.org/jira/browse/FLINK-18971 > Project: Flink > Issue Type: Sub-task > Components: Deployment / Kubernetes >Reporter: Yangze Guo >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available > > Currently, if user want to enable Kerberos Authentication, they need to build > a custom image with keytab and krb5 conf file. To improve usability, we need > to create a ConfigMap and a Secrete for krb5 conf and keytab when needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18971) Support to mount kerberos conf as ConfigMap and Keytab as Secrete
[ https://issues.apache.org/jira/browse/FLINK-18971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-18971: - Fix Version/s: 1.12.0 > Support to mount kerberos conf as ConfigMap and Keytab as Secrete > - > > Key: FLINK-18971 > URL: https://issues.apache.org/jira/browse/FLINK-18971 > Project: Flink > Issue Type: Sub-task > Components: Deployment / Kubernetes >Reporter: Yangze Guo >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > Currently, if user want to enable Kerberos Authentication, they need to build > a custom image with keytab and krb5 conf file. To improve usability, we need > to create a ConfigMap and a Secrete for krb5 conf and keytab when needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong closed pull request #13255: [FLINK-18971] Support to mount kerberos conf as ConfigMap and Keytab …
xintongsong closed pull request #13255: URL: https://github.com/apache/flink/pull/13255 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13729: [FLINK-19644][hive] Support read specific partition of Hive table in temporal join
JingsongLi commented on a change in pull request #13729: URL: https://github.com/apache/flink/pull/13729#discussion_r509859082 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveLookupFunction.java ## @@ -47,29 +59,46 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_CACHE_TTL; +import static org.apache.flink.connectors.hive.HiveTableFactory.LOOKUP_JOIN_PARTITION; +import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getPartitionByPartitionSpecs; +import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.getTableProps; +import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.toHiveTablePartition; +import static org.apache.flink.table.catalog.hive.util.HivePartitionUtils.validateAndParsePartitionSpecs; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * Lookup table function for filesystem connector tables. + * Lookup table function for Hive connector tables. */ -public class FileSystemLookupFunction extends TableFunction { +public class HiveLookupFunction extends TableFunction { Review comment: Why do this? Why not Filesystem also have this lookup capability? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19733) Make fast_operation and slow_operation produce functions consistent
[ https://issues.apache.org/jira/browse/FLINK-19733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-19733. --- Fix Version/s: 1.12.0 Assignee: Huang Xingbo Resolution: Fixed Merged to master via d5e81688d85b8f24161a4397c8ef8dfac0bbcd51 > Make fast_operation and slow_operation produce functions consistent > --- > > Key: FLINK-19733 > URL: https://issues.apache.org/jira/browse/FLINK-19733 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > The function generated by slow_operation uses the characteristics of python > syntax. In order to better reconstruct the python operation, we need to keep > the functions generated by fast_operation and slow_operation consistent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r509857812 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java ## @@ -104,6 +106,67 @@ KubernetesWatch watchPodsAndDoCallback( Map labels, WatchCallbackHandler podCallbackHandler); + /** +* Create the ConfigMap with specified content. If the ConfigMap already exists, a FlinkRuntimeException will be +* thrown. +* +* @param configMap ConfigMap. +* +* @return Return the ConfigMap create future. +*/ + CompletableFuture createConfigMap(KubernetesConfigMap configMap); + + /** +* Get the ConfigMap with specified name. +* +* @param name ConfigMap name. +* +* @return Return the ConfigMap, or empty if the ConfigMap does not exist. +*/ + Optional getConfigMap(String name); + + /** +* Update an existing ConfigMap with the data. Benefit from https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions> +* resource version and combined with {@link #getConfigMap(String)}, we could perform a get-check-and-update +* transactional operation. Since concurrent modification could happen on a same ConfigMap, +* the update operation may fail. We need to retry internally. The max retry attempts could be +* configured via {@link org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES}. +* +* @param configMapName ConfigMap to be replaced with. +* @param function Function to be applied to the obtained ConfigMap and get a new updated one. If the returned Review comment: For example, we do the serialize/deserialize/discard-state for the job graph store or completed checkpoint. We could encounter the `Exception`. I think we could retry in such situation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19734) Replace 'collection' connector by 'values' connector for temporal join plan tests
[ https://issues.apache.org/jira/browse/FLINK-19734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-19734. --- Resolution: Fixed Fixed in master: fc91b0830384e10b87ff0d8ab05258e3f89a8d1b > Replace 'collection' connector by 'values' connector for temporal join plan > tests > - > > Key: FLINK-19734 > URL: https://issues.apache.org/jira/browse/FLINK-19734 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Leonard Xu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > Currently, both COLLECTION and VALUES connectors are `LookupTableSoure`, we > can add a non lookup table source connector to cover scan-only source. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu closed pull request #13713: [FLINK-19733][python] Make fast_operation and slow_operation produce functions consistent
dianfu closed pull request #13713: URL: https://github.com/apache/flink/pull/13713 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #13708: [FLINK-19734][table-planner-blink] Replace 'collection' connector by 'values' connector for temporal join plan tests
wuchong merged pull request #13708: URL: https://github.com/apache/flink/pull/13708 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #13708: [FLINK-19734][table-planner-blink] Replace 'collection' connector by 'values' connector for temporal join plan tests
wuchong commented on pull request #13708: URL: https://github.com/apache/flink/pull/13708#issuecomment-714197085 Thanks for the reviewing. Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r509856352 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java ## @@ -219,6 +230,71 @@ public KubernetesWatch watchPodsAndDoCallback( .watch(new KubernetesPodsWatcher(podCallbackHandler))); } + @Override + public CompletableFuture createConfigMap(KubernetesConfigMap configMap) { + final String configMapName = configMap.getName(); + return CompletableFuture.runAsync( + () -> this.internalClient.configMaps().inNamespace(namespace).create(configMap.getInternalResource()), + kubeClientExecutorService) + .whenComplete((ignored, throwable) -> { + if (throwable != null) { + throw new FlinkRuntimeException("Failed to create ConfigMap " + configMapName, throwable); + } + }); + } + + @Override + public Optional getConfigMap(String name) { + final ConfigMap configMap = this.internalClient.configMaps().inNamespace(namespace).withName(name).get(); + return configMap == null ? Optional.empty() : Optional.of(new KubernetesConfigMap(configMap)); + } + + @Override + public CompletableFuture checkAndUpdateConfigMap( + String configMapName, + FunctionWithException, ?> function) { + return FutureUtils.retry( + () -> CompletableFuture.supplyAsync( + () -> getConfigMap(configMapName) + .map(FunctionUtils.uncheckedFunction(configMap -> { + final boolean updated = function.apply(configMap).map( + updatedConfigMap -> { + this.internalClient.configMaps() + .inNamespace(namespace) + .createOrReplace(updatedConfigMap.getInternalResource()); + return true; + }).orElse(false); + if (!updated) { + LOG.warn("Trying to update ConfigMap {} to {} without checking pass, ignoring.", + configMap.getName(), configMap.getData()); + } + return updated; + })) + .orElseThrow( + () -> new FlinkRuntimeException("ConfigMap " + configMapName + " not exists.")), Review comment: What I mean is the ConfigMap could be created in the `KubernetesLeaderElectionService#Watcher`. So even the first we get a `Optional.empty()`, we could a get correct ConfigMap by retrying. I will add a two tests here. * ConfigMap always does not exists and retry failed * ConfigMap exists at the very begging and retry successfully This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19655) NPE when using blink planner and TemporalTableFunction after setting IdleStateRetentionTime
[ https://issues.apache.org/jira/browse/FLINK-19655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-19655. --- Fix Version/s: 1.12.0 Resolution: Fixed Fixed in master: daeda68edf3466a3f9347c25bdf866ef4f620396 > NPE when using blink planner and TemporalTableFunction after setting > IdleStateRetentionTime > > > Key: FLINK-19655 > URL: https://issues.apache.org/jira/browse/FLINK-19655 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0, 1.11.0 >Reporter: seunjjs >Assignee: seunjjs >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > My Code here: > {code:java} > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > bsSettings); > tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), > Time.seconds(600)); > final Table table = tableEnv.from("tableName"); > final TableFunction function = table.createTemporalTableFunction( > temporalTableEntry.getTimeAttribute(), > String.join(",", > temporalTableEntry.getPrimaryKeyFields())); > tableEnv.registerFunction(temporalTableEntry.getName(), function); > {code} > And NPE throwed when I executed my program. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109) > at > org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > {code} > > And When I changed to useOldPlanner, it worked fine.And when I debuged the > code ,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be > executed. > Here is BaseTwoInputStreamOperatorWithStateRetention#open code. > {code:java} > public void open() throws Exception { > initializeTimerService(); > if (stateCleaningEnabled) { > ValueStateDescriptor cleanupStateDescriptor = > new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, > Types.LONG); > latestRegisteredCleanupTimer = > getRuntimeContext().getState(cleanupStateDescriptor); > } > } > {code} > Here is TemporalProcessTimeJoinOperator#open code. > {code:java} > public void open() throws Exception { > this.joinCondition = > generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); > FunctionUtils.setFunctionRuntimeContext(joinCondition, > getRuntimeContext()); > FunctionUtils.openFunction(joinCondition, new Configuration()); > ValueStateDescriptor rightStateDesc = new > ValueStateDescriptor<>("right", rightType); > this.rightState = getRuntimeContext().getState(rightStateDesc); > this.collector = new TimestampedCollector<>(output); > this.outRow = new JoinedRow(); > // consider watermark from left stream only. > super.processWatermark2(Watermark.MAX_WATERMARK); > } > {code} > I compared the code with
[GitHub] [flink] wuchong merged pull request #13675: [FLINK-19655][flink-table-runtime-blink] add super.open() and write unit test for temporal process join
wuchong merged pull request #13675: URL: https://github.com/apache/flink/pull/13675 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade
flinkbot commented on pull request #13737: URL: https://github.com/apache/flink/pull/13737#issuecomment-714194471 ## CI report: * 20fd494ddd03e04ffe024ee24e4a31941950f787 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13711: [FLINK-19721] [flink-runtime] Support exponential backoff retries in RpcGatewayRetriever
flinkbot edited a comment on pull request #13711: URL: https://github.com/apache/flink/pull/13711#issuecomment-713051309 ## CI report: * 6c8fb272a1859f797ccd5cf62cb9392d9b287e9b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7963) * 1f877b3d8220d0896d3ae433e20a855ae1481411 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8059) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-19756) Use multi-input optimization by default
Caizhi Weng created FLINK-19756: --- Summary: Use multi-input optimization by default Key: FLINK-19756 URL: https://issues.apache.org/jira/browse/FLINK-19756 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Caizhi Weng Fix For: 1.12.0 After the multiple input operator is introduced we should use this optimization by default. This will affect a large amount of plan tests so we will do this in an independent subtask. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade
flinkbot commented on pull request #13737: URL: https://github.com/apache/flink/pull/13737#issuecomment-714191751 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 20fd494ddd03e04ffe024ee24e4a31941950f787 (Thu Oct 22 03:06:39 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TsReaper opened a new pull request #13737: [hotfix] Fix DeadlockBreakupTest due to Calcite upgrade
TsReaper opened a new pull request #13737: URL: https://github.com/apache/flink/pull/13737 ## What is the purpose of the change This commit fixes the plan test failure in `DeadlockBreakupTest` due to Calcite upgrade. ## Brief change log - Fix tests failure in `DeadlockBreakupTest`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218707#comment-17218707 ] Jark Wu commented on FLINK-19629: - Fixed in - master: 3c661074b2a597db312ebaf6734c58eb66464ba4 - release-1.11: TODO > Avro format cause NullPointException,as null value in MAP type's value type > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > create table tableA ( > name STRING, > hobly MAP, > phone STRING > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'ShizcTest', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'ShizcTest', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > if hobly have an null value like this: > {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"} > cause an NullPointException: > {code:java} > java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148) > ... 8 common frames omitted > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19629) Fix NullPointException when deserializing map field with null value for Avro format
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19629: Summary: Fix NullPointException when deserializing map field with null value for Avro format (was: Avro format cause NullPointException,as null value in MAP type's value type) > Fix NullPointException when deserializing map field with null value for Avro > format > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > create table tableA ( > name STRING, > hobly MAP, > phone STRING > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'ShizcTest', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'ShizcTest', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > if hobly have an null value like this: > {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"} > cause an NullPointException: > {code:java} > java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148) > ... 8 common frames omitted > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19629: Fix Version/s: 1.11.3 > Avro format cause NullPointException,as null value in MAP type's value type > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > create table tableA ( > name STRING, > hobly MAP, > phone STRING > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'ShizcTest', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'ShizcTest', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > if hobly have an null value like this: > {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"} > cause an NullPointException: > {code:java} > java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148) > ... 8 common frames omitted > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] shizhengchao commented on pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type
shizhengchao commented on pull request #13634: URL: https://github.com/apache/flink/pull/13634#issuecomment-714188821 > LGTM. > > Could you also prepare a pull request for release-1.11 branch? Ok, i will finish as soon as possible This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type
wuchong merged pull request #13634: URL: https://github.com/apache/flink/pull/13634 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13736: [FLINK-19654][python][e2e] Reduce pyflink e2e test parallelism
flinkbot edited a comment on pull request #13736: URL: https://github.com/apache/flink/pull/13736#issuecomment-714183394 ## CI report: * 259c893dcb4b5692df54cfe7739f95d9e34096e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8056) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shizhengchao commented on a change in pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type
shizhengchao commented on a change in pull request #13634: URL: https://github.com/apache/flink/pull/13634#discussion_r509848989 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java ## @@ -178,7 +178,7 @@ private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType) private static AvroToRowDataConverter createMapConverter(LogicalType type) { final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType()); Review comment: > Avro can't serialize null key for map. That's right, and this problem only exists when the type of map entry is `STRING()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r509848812 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.util.function.FunctionUtils; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; +import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY; +import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Leader election service for multiple JobManagers. The active JobManager is elected using Kubernetes. + * The current leader's address as well as its leader session ID is published via Kubernetes ConfigMap. + * Note that the contending lock and leader storage are using the same ConfigMap. And every component(e.g. + * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will have a separate ConfigMap. + */ +public class KubernetesLeaderElectionService extends AbstractLeaderElectionService { + + private final FlinkKubeClient kubeClient; + + private final Executor executor; + + private final String configMapName; + + private final KubernetesLeaderElector leaderElector; + + private KubernetesWatch kubernetesWatch; + + // Labels will be used to clean up the ha related ConfigMaps. + private Map configMapLabels; + + KubernetesLeaderElectionService( + FlinkKubeClient kubeClient, + Executor executor, + KubernetesLeaderElectionConfiguration leaderConfig) { + + this.kubeClient = checkNotNull(kubeClient, "Kubernetes client should not be null."); + this.executor = checkNotNull(executor, "Executor should not be null."); + this.configMapName = leaderConfig.getConfigMapName(); + this.leaderElector = kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl()); + this.leaderContender = null; + this.configMapLabels = KubernetesUtils.getConfigMapLabels( + leaderConfig.getClusterId(), LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY); + } + + @Override + public void internalStart(LeaderContender contender) { + CompletableFuture.runAsync(leaderElector::run, executor); + kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new ConfigMapCallbackHandlerImpl()); + } + + @Override + public void internalStop() { + if (kubernetesWatch != null) { + kubernetesWatch.close(); + } + } + + @Override + protected void writeLeaderInformation() { + try { + kubeClient.checkAndUpdateConfigMap( + configMapName, + configMap -> { + if (leaderElector.hasLeadership(configMap)) { + // Get the updated ConfigMap with new leader information +
[GitHub] [flink] flinkbot edited a comment on pull request #13711: [FLINK-19721] [flink-runtime] Support exponential backoff retries in RpcGatewayRetriever
flinkbot edited a comment on pull request #13711: URL: https://github.com/apache/flink/pull/13711#issuecomment-713051309 ## CI report: * 6c8fb272a1859f797ccd5cf62cb9392d9b287e9b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7963) * 1f877b3d8220d0896d3ae433e20a855ae1481411 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type
wuchong commented on pull request #13634: URL: https://github.com/apache/flink/pull/13634#issuecomment-714187780 The failed e2e test is timeout. This pull request shoudn't affect the e2e tests. Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.
[ https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218704#comment-17218704 ] Jark Wu commented on FLINK-19754: - {{stmtSet.execute()}} already submit the job, you don't need and shouldn't call {{env.execute(requestPrm.getString("xxx"))}} again. > Cannot have more than one execute() or executeAsync() call in a single > environment. > --- > > Key: FLINK-19754 > URL: https://issues.apache.org/jira/browse/FLINK-19754 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 >Reporter: little-tomato >Priority: Major > > I run this code on my Standalone Cluster. When i submit the job,the error log > is as follows: > {code} > 2020-10-20 11:53:42,969 WARN > org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - > Could not execute application: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Cannot have more than one execute() or executeAsync() call > in a single environment. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > [?:1.8.0_221] > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_221] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_221] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] > Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more > than one execute() or executeAsync() call in a single environment. > at > org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) > ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_221] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_221] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > {code} > my code is: > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); > ... > FlinkKafkaConsumer myConsumer = new > FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), > properties); > myConsumer.setStartFromLatest(); > DataStream kafkaDataStream = env.addSource(myConsumer); > SingleOutputStreamOperator
[jira] [Updated] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.
[ https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19754: Description: I run this code on my Standalone Cluster. When i submit the job,the error log is as follows: {code} 2020-10-20 11:53:42,969 WARN org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [?:1.8.0_221] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_221] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_221] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_221] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-clients_2.12-1.11.0.jar:1.11.0] {code} my code is: {code:java} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); ... FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), properties); myConsumer.setStartFromLatest(); DataStream kafkaDataStream = env.addSource(myConsumer); SingleOutputStreamOperator sourceStream = kafkaDataStream .map(new MapFunction() { ... } ); DataStream dataStreamRow = sourceStream.map(new MyMapFunction()).filter(new RuleDataProccessFunction()).map(new MapFunction() { private static final long serialVersionUID = 1L; @Override public Row map(MessageInfo value) throws Exception \\{ ... } }).returns(new RowTypeInfo(rowTypeArr, fieldArr)); tEnv.registerFunction("test",new TestFunction()); Table table = tEnv.fromDataStream(dataStreamRow, fieldStr); tEnv.createTemporaryView("mytable", table); String ddl = "CREATE TABLE user_log_1155 ...from kafka topic:user_log_1155"; tEnv.executeSql(ddl); String ddl1 = "CREATE TABLE user_test_1155 ...from kafka topic:user_test_1155"; tEnv.executeSql(ddl); StatementSet stmtSet = tEnv.createStatementSet();
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r509847440 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.util.UUID; + +/** + * Abstract class for leader election service based on distributed coordination system(e.g. Zookeeper, Kubernetes, etc.). + */ +public abstract class AbstractLeaderElectionService implements LeaderElectionService { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + protected final Object lock = new Object(); + + /** The leader contender which applies for leadership. */ + protected volatile LeaderContender leaderContender; + + private volatile UUID issuedLeaderSessionID; + + protected volatile UUID confirmedLeaderSessionID; + + protected volatile String confirmedLeaderAddress; + + protected volatile boolean running; + + protected AbstractLeaderElectionService() { + leaderContender = null; + + issuedLeaderSessionID = null; + confirmedLeaderSessionID = null; + confirmedLeaderAddress = null; + + running = false; + } + + @Override + public final void start(LeaderContender contender) throws Exception { + Preconditions.checkNotNull(contender, "Contender must not be null."); + Preconditions.checkState(leaderContender == null, "Contender was already set."); + + logger.info("Starting LeaderElectionService {}.", this); + + synchronized (lock) { + leaderContender = contender; + running = true; + internalStart(contender); + } + } + + @Override + public final void stop() throws Exception { + synchronized (lock) { + if (!running) { + return; + } + running = false; + clearConfirmedLeaderInformation(); + } + + logger.info("Stopping LeaderElectionService {}.", this); + + internalStop(); + } + + @Override + public void confirmLeadership(UUID leaderSessionID, String leaderAddress) { + if (logger.isDebugEnabled()) { + logger.debug( + "Confirm leader session ID {} for leader {}.", + leaderSessionID, + leaderAddress); + } + + Preconditions.checkNotNull(leaderSessionID); + + if (checkLeaderLatch()) { Review comment: This will be done in the composition refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danbosnichill commented on pull request #13711: [FLINK-19721] [flink-runtime] Support exponential backoff retries in RpcGatewayRetriever
danbosnichill commented on pull request #13711: URL: https://github.com/apache/flink/pull/13711#issuecomment-714186065 I switched to `java.time.Duration`. I did not switch other calls to `ExponentialBackoffRetryStrategy`. I do not know the code base well enough to say if this is worth doing. E.g. what are safe minimums? What frequency and backoff makes sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #13634: [FLINK-19629]Fix NullPointException in avro format as null value on MAP type
wuchong commented on a change in pull request #13634: URL: https://github.com/apache/flink/pull/13634#discussion_r509846367 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java ## @@ -178,7 +178,7 @@ private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType) private static AvroToRowDataConverter createMapConverter(LogicalType type) { final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType()); Review comment: Avro can't serialize null key for map. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org