[jira] [Commented] (FLINK-19171) K8s Resource Manager may lead to resource leak after pod deleted
[ https://issues.apache.org/jira/browse/FLINK-19171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193388#comment-17193388 ] Xintong Song commented on FLINK-19171: -- Hi [~yittg], Thanks for reporting this. I'm trying to understand why would a pending pod be deleted manually? Or to put it another way, is it a reasonable requirement for Flink to deal with such cases? Is there any other possible cases that may cause pod being removed without a terminated state? To add another input, there is a discussion (see FLINK-13554) about having a timeout for starting new workers, which might help in this case. If a worker is not started and registered within the timeout, resource manager will abandon it and request for a new worker. > K8s Resource Manager may lead to resource leak after pod deleted > > > Key: FLINK-19171 > URL: https://issues.apache.org/jira/browse/FLINK-19171 > Project: Flink > Issue Type: Bug >Reporter: Yi Tang >Priority: Minor > > {code:java} > private void terminatedPodsInMainThread(List pods) { >getMainThreadExecutor().execute(() -> { > for (KubernetesPod pod : pods) { > if (pod.isTerminated()) { > ... > } > } >}); > } > {code} > Looks like that the RM only remove the pod from ledger if the pod > "isTerminated", > and the pod has been taken accounted after being created. > However, it is not complete by checking pod "isTerminated", e.g. a Pending > pod is deleted manually. > After that, a new job requires more resource can not trigger the allocation > of a new pod. > > Pls let me know if i misunderstand, thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-19183) flink-connector-hive module compile failed with "cannot find symbol: variable TableEnvUtil"
[ https://issues.apache.org/jira/browse/FLINK-19183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-19183: Assignee: Jingsong Lee > flink-connector-hive module compile failed with "cannot find symbol: variable > TableEnvUtil" > --- > > Key: FLINK-19183 > URL: https://issues.apache.org/jira/browse/FLINK-19183 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Jingsong Lee >Priority: Blocker > Labels: test-stability > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6416=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=9b1a0f88-517b-5893-fc93-76f4670982b4] > {code} > [ERROR] COMPILATION ERROR : > [INFO] - > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[584,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[589,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19183) flink-connector-hive module compile failed with "cannot find symbol: variable TableEnvUtil"
[ https://issues.apache.org/jira/browse/FLINK-19183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193387#comment-17193387 ] Jingsong Lee commented on FLINK-19183: -- Fixed in master: e319820b1b14304f00e40f3dba2b9c6e4ecda6bf > flink-connector-hive module compile failed with "cannot find symbol: variable > TableEnvUtil" > --- > > Key: FLINK-19183 > URL: https://issues.apache.org/jira/browse/FLINK-19183 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Jingsong Lee >Priority: Blocker > Labels: test-stability > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6416=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=9b1a0f88-517b-5893-fc93-76f4670982b4] > {code} > [ERROR] COMPILATION ERROR : > [INFO] - > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[584,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[589,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-statefun] klion26 commented on pull request #131: [FLINK-18968] Translate README.md to Chinese
klion26 commented on pull request #131: URL: https://github.com/apache/flink-statefun/pull/131#issuecomment-690003117 Sorry for the late reply, dealt with some urgent things on hand. I've drafted a [Flink Stateful Functions Translation Specifications](https://docs.google.com/document/d/1Fv56PG50pHlDxZfF49iRhv-8dY7ZaCSHUCvBM2Vy0ao/edit?usp=sharing), please take a look when you're free, thanks. cc @tzulitai @carp84 @billyrrr This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13347: [FLINK-19151][yarn]Fix the unit value according to different yarn scheduler
flinkbot edited a comment on pull request #13347: URL: https://github.com/apache/flink/pull/13347#issuecomment-688594190 ## CI report: * d71c0a34f1db6afd423517d1b514316ba394f33e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6418) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on pull request #11830: [FLINK-17096] [table] Support state ttl for Mini-Batch Group Agg using StateTtlConfig
lsyldliu commented on pull request #11830: URL: https://github.com/apache/flink/pull/11830#issuecomment-690001415 > Regarding to `AggregateITCase#testListAggWithRetraction`, we should update `StateListView#remove` to not hard cast `getListState().get()` to `List`. We can use the iterator for removing, and maybe we need to support `remove` for `TtlListState#IteratorWithCleanup`. for this problem, should we open a new jira issue talk about it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193305#comment-17193305 ] tinny cat edited comment on FLINK-19167 at 9/10/20, 5:43 AM: - however, I set the `TimeCharacteristic` was event time, and assign the watermark as event time。 The conclusion I got is: If ctx.timestamp() is assigned to `current.lastModified`, This code as follow will be never execute: {code:java} if (timestamp == result.lastModified + 6) { // emit the state on timeout out.collect(new Tuple2(result.key, result.count)); } {code} because, `timestamp` always less than result.lastModified was (Author: tinny): however, I set the `TimeCharacteristic` was event time, and assign the watermark as event time。 The conclusion I got is: If ctx.timestamp() is assigned to `current.lastModified`, This code as follow will be never execute: {code:java} if (timestamp == result.lastModified + 6) { // emit the state on timeout out.collect(new Tuple2(result.key, result.count)); } {code} because, `timestamp` always equals result.lastModified > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13365: [FLINK-16905][python] TableEnvironment.from_elements support Expression
flinkbot edited a comment on pull request #13365: URL: https://github.com/apache/flink/pull/13365#issuecomment-689524299 ## CI report: * 869dc1f796546eb2805c4b3036bb964418cda944 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6397) * 56dc056bb0e4eebe438a82949d3b62796999db2b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6419) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13347: [FLINK-19151][yarn]Fix the unit value according to different yarn scheduler
flinkbot edited a comment on pull request #13347: URL: https://github.com/apache/flink/pull/13347#issuecomment-688594190 ## CI report: * d67ba16822c319db0c5b07f867b6fa5d8f84c68d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6369) * d71c0a34f1db6afd423517d1b514316ba394f33e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6418) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13365: [FLINK-16905][python] TableEnvironment.from_elements support Expression
flinkbot edited a comment on pull request #13365: URL: https://github.com/apache/flink/pull/13365#issuecomment-689524299 ## CI report: * 869dc1f796546eb2805c4b3036bb964418cda944 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6397) * 56dc056bb0e4eebe438a82949d3b62796999db2b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13347: [FLINK-19151][yarn]Fix the unit value according to different yarn scheduler
flinkbot edited a comment on pull request #13347: URL: https://github.com/apache/flink/pull/13347#issuecomment-688594190 ## CI report: * d67ba16822c319db0c5b07f867b6fa5d8f84c68d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6369) * d71c0a34f1db6afd423517d1b514316ba394f33e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Issue Comment Deleted] (FLINK-19183) flink-connector-hive module compile failed with "cannot find symbol: variable TableEnvUtil"
[ https://issues.apache.org/jira/browse/FLINK-19183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19183: Comment: was deleted (was: cc [~sujun1020] [~lzljs3620320]) > flink-connector-hive module compile failed with "cannot find symbol: variable > TableEnvUtil" > --- > > Key: FLINK-19183 > URL: https://issues.apache.org/jira/browse/FLINK-19183 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Blocker > Labels: test-stability > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6416=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=9b1a0f88-517b-5893-fc93-76f4670982b4] > {code} > [ERROR] COMPILATION ERROR : > [INFO] - > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[584,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[589,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19183) flink-connector-hive module compile failed with "cannot find symbol: variable TableEnvUtil"
[ https://issues.apache.org/jira/browse/FLINK-19183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193375#comment-17193375 ] Dian Fu commented on FLINK-19183: - cc [~sujun1020] [~lzljs3620320] > flink-connector-hive module compile failed with "cannot find symbol: variable > TableEnvUtil" > --- > > Key: FLINK-19183 > URL: https://issues.apache.org/jira/browse/FLINK-19183 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Blocker > Labels: test-stability > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6416=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=9b1a0f88-517b-5893-fc93-76f4670982b4] > {code} > [ERROR] COMPILATION ERROR : > [INFO] - > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[584,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[589,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19183) flink-connector-hive module compile failed with "cannot find symbol: variable TableEnvUtil"
[ https://issues.apache.org/jira/browse/FLINK-19183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19183: Labels: test-stability (was: ) > flink-connector-hive module compile failed with "cannot find symbol: variable > TableEnvUtil" > --- > > Key: FLINK-19183 > URL: https://issues.apache.org/jira/browse/FLINK-19183 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Blocker > Labels: test-stability > Fix For: 1.12.0 > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6416=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=9b1a0f88-517b-5893-fc93-76f4670982b4] > {code} > [ERROR] COMPILATION ERROR : > [INFO] - > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[584,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[589,33] > cannot find symbol > symbol: variable TableEnvUtil > location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19183) flink-connector-hive module compile failed with "cannot find symbol: variable TableEnvUtil"
Dian Fu created FLINK-19183: --- Summary: flink-connector-hive module compile failed with "cannot find symbol: variable TableEnvUtil" Key: FLINK-19183 URL: https://issues.apache.org/jira/browse/FLINK-19183 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.12.0 Reporter: Dian Fu Fix For: 1.12.0 [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6416=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=9b1a0f88-517b-5893-fc93-76f4670982b4] {code} [ERROR] COMPILATION ERROR : [INFO] - [ERROR] /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[584,33] cannot find symbol symbol: variable TableEnvUtil location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase [ERROR] /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[589,33] cannot find symbol symbol: variable TableEnvUtil location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lsyldliu commented on a change in pull request #11830: [FLINK-17096] [table] Support state ttl for Mini-Batch Group Agg using StateTtlConfig
lsyldliu commented on a change in pull request #11830: URL: https://github.com/apache/flink/pull/11830#discussion_r486064098 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java ## @@ -180,7 +179,7 @@ public void processElement(RowData input, Context ctx, Collector out) t // if this was not the first row and we have to emit retractions if (!firstRow) { - if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) { + if (stateRetentionTime > 0 && equaliser.equals(prevAggValue, newAggValue)) { Review comment: you are right, tks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a change in pull request #11830: [FLINK-17096] [table] Support state ttl for Mini-Batch Group Agg using StateTtlConfig
lsyldliu commented on a change in pull request #11830: URL: https://github.com/apache/flink/pull/11830#discussion_r486064098 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java ## @@ -180,7 +179,7 @@ public void processElement(RowData input, Context ctx, Collector out) t // if this was not the first row and we have to emit retractions if (!firstRow) { - if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) { + if (stateRetentionTime > 0 && equaliser.equals(prevAggValue, newAggValue)) { Review comment: you are write This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13003: [FLINK-18737][docs]translate jdbc connector
flinkbot edited a comment on pull request #13003: URL: https://github.com/apache/flink/pull/13003#issuecomment-664844064 ## CI report: * 565d353b41557312917ef867210bb731dd972fe7 UNKNOWN * 5814af2fbd77b09bcd257ec8832d093e9a046098 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6412) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a change in pull request #13347: [FLINK-19151][yarn]Fix the unit value according to different yarn scheduler
xintongsong commented on a change in pull request #13347: URL: https://github.com/apache/flink/pull/13347#discussion_r486032936 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java ## @@ -143,8 +149,9 @@ private InternalContainerResource createAndMapContainerResource(final WorkerReso /** * Normalize to the minimum integer that is greater or equal to 'value' and is positive integer multiple of 'unitValue'. Review comment: JavaDoc needs update. ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -58,4 +61,30 @@ public void testDeleteApplicationFiles() throws Exception { assertThat(files.count(), equalTo(0L)); } } + + @Test + public void testGetUnitResource() { + YarnConfiguration yarnConfig = new YarnConfiguration(); + + yarnConfig.set("yarn.resourcemanager.scheduler.class", "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); Review comment: We should always try to avoid magic string literals if possible. ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java ## @@ -106,6 +110,31 @@ public void testMatchVcores() { assertThat(adapter.getWorkerSpecs(containerResource1, strategy), containsInAnyOrder(workerSpec1, workerSpec2)); assertThat(adapter.getWorkerSpecs(containerResource2, strategy), contains(workerSpec3)); assertThat(adapter.getWorkerSpecs(containerResource3, strategy), contains(workerSpec4)); + + final int unitMemMB1 = 150; + final int unitVcore1 = 15; + final WorkerSpecContainerResourceAdapter adapter1 = + new WorkerSpecContainerResourceAdapter( + getConfigProcessSpecEqualsWorkerSpec(), + minMemMB, + minVcore, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + unitMemMB1, + unitVcore1, + Collections.emptyMap()); + + final Resource containerResource4 = Resource.newInstance(150, 15); + + assertThat(adapter1.getWorkerSpecs(containerResource1, strategy), empty()); + assertThat(adapter1.getWorkerSpecs(containerResource2, strategy), empty()); + + assertThat(adapter1.tryComputeContainerResource(workerSpec1).get(), is(containerResource4)); + assertThat(adapter1.tryComputeContainerResource(workerSpec2).get(), is(containerResource4)); + assertThat(adapter1.tryComputeContainerResource(workerSpec3).get(), is(containerResource4)); + assertThat(adapter1.tryComputeContainerResource(workerSpec4).get(), is(containerResource4)); + + assertThat(adapter1.getWorkerSpecs(containerResource4, strategy), containsInAnyOrder(workerSpec1, workerSpec2, workerSpec3, workerSpec4)); Review comment: I'm not sure whether it is necessary to introduce another adapter. Should be good enough to tune the parameters of existing cases. I guess the problem is that the original test case did not make its intention explicit. ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java ## @@ -169,6 +202,31 @@ public void testIgnoreVcores() { assertThat(adapter.getWorkerSpecs(containerResource4, strategy), containsInAnyOrder(workerSpec1, workerSpec2, workerSpec3)); assertThat(adapter.getWorkerSpecs(containerResource5, strategy), contains(workerSpec4)); + + final int unitMemMB1 = 150; + final int unitVcore1 = 15; + final WorkerSpecContainerResourceAdapter adapter1 = + new WorkerSpecContainerResourceAdapter( + getConfigProcessSpecEqualsWorkerSpec(), + minMemMB, + minVcore, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + unitMemMB1, + unitVcore1, + Collections.emptyMap()); + + final Resource containerResource6 = Resource.newInstance(150, 15); + + assertThat(adapter1.tryComputeContainerResource(workerSpec1).get(), is(containerResource6)); + assertThat(adapter1.tryComputeContainerResource(workerSpec2).get(), is(containerResource6)); + assertThat(adapter1.tryComputeContainerResource(workerSpec3).get(), is(containerResource6)); +
[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
flinkbot edited a comment on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193 ## CI report: * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN * cb369a8127cc00759e82f2fe58e06737a5eadb7e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6417) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11830: [FLINK-17096] [table] Support state ttl for Mini-Batch Group Agg using StateTtlConfig
wuchong commented on a change in pull request #11830: URL: https://github.com/apache/flink/pull/11830#discussion_r486054242 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java ## @@ -180,7 +179,7 @@ public void processElement(RowData input, Context ctx, Collector out) t // if this was not the first row and we have to emit retractions if (!firstRow) { - if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) { + if (stateRetentionTime > 0 && equaliser.equals(prevAggValue, newAggValue)) { Review comment: I think this is the one causes `RetractionITCase#testUniqueProcess` failed. Here should be `stateRetentionTime <= 0` to indicate state cleaning is not enabled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on pull request #13276: [FLINK-18604][connectors/HBase] HBase ConnectorDescriptor can not work in Table API
JingsongLi commented on pull request #13276: URL: https://github.com/apache/flink/pull/13276#issuecomment-689969917 Hi @pyscala , I meant we should add a ITCase for this, I think this bug is caused by the absence of ITCase in previous PR. (Maybe add a case in `HBaseConnectorITCase`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #11830: [FLINK-17096] [table] Support state ttl for Mini-Batch Group Agg using StateTtlConfig
wuchong commented on pull request #11830: URL: https://github.com/apache/flink/pull/11830#issuecomment-689969724 Regarding to `AggregateITCase#testListAggWithRetraction`, we should update `StateListView#remove` to not hard cast `getListState().get()` to `List`. We can use the iterator for removing, and maybe we need to support `remove` for `TtlListState#IteratorWithCleanup`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11830: [FLINK-17096] [table] Support state ttl for Mini-Batch Group Agg using StateTtlConfig
wuchong commented on a change in pull request #11830: URL: https://github.com/apache/flink/pull/11830#discussion_r486053094 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java ## @@ -83,61 +85,58 @@ // stores the accumulators private transient ValueState accState = null; + private final long stateRetentionTime; Review comment: Move the this member variable around to those `final` variables. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-17779) Orc file format support filter push down
[ https://issues.apache.org/jira/browse/FLINK-17779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-17779. Resolution: Fixed master: cbda1f90faed90d6f6cbd9a7f16fecf004e1245d > Orc file format support filter push down > > > Key: FLINK-17779 > URL: https://issues.apache.org/jira/browse/FLINK-17779 > Project: Flink > Issue Type: Sub-task > Components: Connectors / ORC >Reporter: Jingsong Lee >Assignee: sujun >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17779) Orc file format support filter push down
[ https://issues.apache.org/jira/browse/FLINK-17779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-17779: - Fix Version/s: 1.12.0 > Orc file format support filter push down > > > Key: FLINK-17779 > URL: https://issues.apache.org/jira/browse/FLINK-17779 > Project: Flink > Issue Type: Sub-task > Components: Connectors / ORC >Reporter: Jingsong Lee >Assignee: sujun >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-19070) Hive connector should throw a meaningful exception if user reads/writes ACID tables
[ https://issues.apache.org/jira/browse/FLINK-19070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-19070. Resolution: Fixed master: fa651801ac3ba711beb57594ac58e4c36090af40 > Hive connector should throw a meaningful exception if user reads/writes ACID > tables > --- > > Key: FLINK-19070 > URL: https://issues.apache.org/jira/browse/FLINK-19070 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Reporter: Rui Li >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi merged pull request #13306: [FLINK-17779][Connectors/ORC]Orc file format support filter push down
JingsongLi merged pull request #13306: URL: https://github.com/apache/flink/pull/13306 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on pull request #13315: [FLINK-19070][hive] Hive connector should throw a meaningful exception if user reads/writes ACID tables
JingsongLi commented on pull request #13315: URL: https://github.com/apache/flink/pull/13315#issuecomment-689966672 Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi merged pull request #13315: [FLINK-19070][hive] Hive connector should throw a meaningful exception if user reads/writes ACID tables
JingsongLi merged pull request #13315: URL: https://github.com/apache/flink/pull/13315 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19175) Tests in JoinITCase do not test BroadcastHashJoin
[ https://issues.apache.org/jira/browse/FLINK-19175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193345#comment-17193345 ] Jark Wu commented on FLINK-19175: - Assigned to you [~danny0405] > Tests in JoinITCase do not test BroadcastHashJoin > - > > Key: FLINK-19175 > URL: https://issues.apache.org/jira/browse/FLINK-19175 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Major > > The tests in JoinITCase claim to test the {{BroadcastHashJoin}}, but they > actually do not. None of the tables used in the tests have proper statistics > therefore, none of the tables meet the threshold for the broadcast join. At > the same time the {{ShuffleHashJoin}} is not disabled, therefore they > silently fallback to {{ShuffleHashJoin}}. > In summary none (or at least not all of the tests) are executed for > BroadcastHashJoin, but are executed twice for ShuffleHashJoin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-19175) Tests in JoinITCase do not test BroadcastHashJoin
[ https://issues.apache.org/jira/browse/FLINK-19175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-19175: --- Assignee: Danny Chen > Tests in JoinITCase do not test BroadcastHashJoin > - > > Key: FLINK-19175 > URL: https://issues.apache.org/jira/browse/FLINK-19175 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Reporter: Dawid Wysakowicz >Assignee: Danny Chen >Priority: Major > > The tests in JoinITCase claim to test the {{BroadcastHashJoin}}, but they > actually do not. None of the tables used in the tests have proper statistics > therefore, none of the tables meet the threshold for the broadcast join. At > the same time the {{ShuffleHashJoin}} is not disabled, therefore they > silently fallback to {{ShuffleHashJoin}}. > In summary none (or at least not all of the tests) are executed for > BroadcastHashJoin, but are executed twice for ShuffleHashJoin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
JingsongLi commented on a change in pull request #13010: URL: https://github.com/apache/flink/pull/13010#discussion_r486047882 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java ## @@ -34,9 +39,19 @@ private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class); + private final DataGenerator generator; + private final long rowsPerSecond; + @Nullable + private Long numberOfRows; + + private int outputSoFar; Review comment: transient ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java ## @@ -34,9 +39,19 @@ private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class); + private final DataGenerator generator; + private final long rowsPerSecond; + @Nullable + private Long numberOfRows; + + private int outputSoFar; + + private int toOutput; Review comment: transient ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java ## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.datagen; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper; +import org.apache.flink.table.factories.datagen.types.RowDataGenerator; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.math.RoundingMode; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS; +import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH; +import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX; +import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN; + + +/** + * Creates a random {@link DataGeneratorContainer} for a particular logical type. + */ +@Internal +@SuppressWarnings("unchecked") +public class RandomGeneratorVisitor extends DataGenVisitorBase { +
[jira] [Commented] (FLINK-19175) Tests in JoinITCase do not test BroadcastHashJoin
[ https://issues.apache.org/jira/browse/FLINK-19175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193344#comment-17193344 ] Danny Chen commented on FLINK-19175: Nice catch, [~dwysakowicz], can i take this issue ? > Tests in JoinITCase do not test BroadcastHashJoin > - > > Key: FLINK-19175 > URL: https://issues.apache.org/jira/browse/FLINK-19175 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Reporter: Dawid Wysakowicz >Priority: Major > > The tests in JoinITCase claim to test the {{BroadcastHashJoin}}, but they > actually do not. None of the tables used in the tests have proper statistics > therefore, none of the tables meet the threshold for the broadcast join. At > the same time the {{ShuffleHashJoin}} is not disabled, therefore they > silently fallback to {{ShuffleHashJoin}}. > In summary none (or at least not all of the tests) are executed for > BroadcastHashJoin, but are executed twice for ShuffleHashJoin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13214: [FLINK-18938][tableSQL/API] Throw better exception message for quering sink-only connector
flinkbot edited a comment on pull request #13214: URL: https://github.com/apache/flink/pull/13214#issuecomment-678119788 ## CI report: * 8c8fcc0f241851849e0b3faefa88aea5d6649662 UNKNOWN * 734bb7cb193a2d5067a957ce2def7a926fe09589 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6375) * aa78f74d8efbf509e2d9247d43cd68ff8e9db0b0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6415) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
JingsongLi commented on a change in pull request #13010: URL: https://github.com/apache/flink/pull/13010#discussion_r486047447 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.datagen; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.ZonedTimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; + +import java.io.Serializable; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.function.Supplier; + +/** + * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s. + */ +public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor { + + protected final String name; + + protected final ReadableConfig config; + + protected DataGenVisitorBase(String name, ReadableConfig config) { + this.name = name; + this.config = config; + } + + @Override + public DataGeneratorContainer visit(DateType dateType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now)); + } + + @Override + public DataGeneratorContainer visit(TimeType timeType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now)); + } + + @Override + public DataGeneratorContainer visit(TimestampType timestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now)); + } + + @Override + public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now)); + } + + @Override + public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(Instant::now)); + } + + @Override + protected DataGeneratorContainer defaultMethod(LogicalType logicalType) { + throw new ValidationException("Unsupported type: " + logicalType); + } + + private interface SerializableSupplier extends Supplier, Serializable { } Review comment: I meant we could do something like `return (Supplier & Serializable) () -> {...}`, but it is OK for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
JingsongLi commented on a change in pull request #13010: URL: https://github.com/apache/flink/pull/13010#discussion_r486047447 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.datagen; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.ZonedTimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; + +import java.io.Serializable; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.function.Supplier; + +/** + * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s. + */ +public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor { + + protected final String name; + + protected final ReadableConfig config; + + protected DataGenVisitorBase(String name, ReadableConfig config) { + this.name = name; + this.config = config; + } + + @Override + public DataGeneratorContainer visit(DateType dateType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now)); + } + + @Override + public DataGeneratorContainer visit(TimeType timeType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now)); + } + + @Override + public DataGeneratorContainer visit(TimestampType timestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now)); + } + + @Override + public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now)); + } + + @Override + public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(Instant::now)); + } + + @Override + protected DataGeneratorContainer defaultMethod(LogicalType logicalType) { + throw new ValidationException("Unsupported type: " + logicalType); + } + + private interface SerializableSupplier extends Supplier, Serializable { } Review comment: I meant we could do something like `return (Supplier & Serializable) () -> {...}`, but it is OK for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a change in pull request #13225: [FLINK-18974][docs-zh]Translate the 'User-Defined Functions' page of "Application Development's DataStream API" into Chinese
RocMarshal commented on a change in pull request #13225: URL: https://github.com/apache/flink/pull/13225#discussion_r486043441 ## File path: docs/dev/user_defined_functions.zh.md ## @@ -114,30 +119,31 @@ data.reduce { (i1,i2) => i1 + i2 } data.reduce { _ + _ } {% endhighlight %} + + ## Rich functions -All transformations that take as argument a lambda function can -instead take as argument a *rich* function. For example, instead of +所有将 lambda 表达式作为参数的转化操作都可以用 *rich* function 来代替。例如,代替 {% highlight scala %} data.map { x => x.toInt } {% endhighlight %} -you can write +你可以写成 {% highlight scala %} class MyMapFunction extends RichMapFunction[String, Int] { def map(in: String):Int = { in.toInt } }; {% endhighlight %} -and pass the function to a `map` transformation: +并将函数传递给 `map` transformation: {% highlight scala %} data.map(new MyMapFunction()) {% endhighlight %} -Rich functions can also be defined as an anonymous class: +富函数也可以定义成匿名类: Review comment: We need to make some necessary decisions. ```function``` or ```函数```? ```rich function``` or ```富函数? We'd better keep the special nouns and keywords consistent in the translation, which is more rigorous. Can you tell me what you think of ? ## File path: docs/dev/user_defined_functions.zh.md ## @@ -23,16 +23,16 @@ specific language governing permissions and limitations under the License. --> -Most operations require a user-defined function. This section lists different -ways of how they can be specified. We also cover `Accumulators`, which can be -used to gain insights into your Flink application. +大多数操作都需要用户自定义函数。本节列出了实现用户自定义函数的不同方式。还会介绍 `Accumulators`(累加器),可用于深入了解你的 Flink 应用程序。 Review comment: ```suggestion 大多数操作都需要用户自定义函数(function)。本节列出了实现用户自定义函数的不同方式。还会介绍 `Accumulators`(累加器),可用于深入了解你的 Flink 应用程序。 ``` ## File path: docs/dev/user_defined_functions.zh.md ## @@ -62,32 +66,33 @@ data.filter(s -> s.startsWith("http://;)); data.reduce((i1,i2) -> i1 + i2); {% endhighlight %} + + ## Rich functions -All transformations that require a user-defined function can -instead take as argument a *rich* function. For example, instead of +所有需要用户自定义函数的转化操作都可以将 *rich* function 作为参数。例如,代替 {% highlight java %} class MyMapFunction implements MapFunction { public Integer map(String value) { return Integer.parseInt(value); } }; {% endhighlight %} -you can write +你可以写成 Review comment: ```suggestion 更改后替换为如下内容 ``` ## File path: docs/dev/user_defined_functions.zh.md ## @@ -147,95 +153,78 @@ data.map (new RichMapFunction[String, Int] { -Rich functions provide, in addition to the user-defined function (map, -reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and -`setRuntimeContext`. These are useful for parameterizing the function -(see [Passing Parameters to Functions]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions)), -creating and finalizing local state, accessing broadcast variables (see -[Broadcast Variables]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)), and for accessing runtime -information such as accumulators and counters (see -[Accumulators and Counters](#accumulators--counters)), and information -on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)). +除了用户自定义的功能(map,reduce 等),Rich functions 还提供了四个方法:`open`、`close`、`getRuntimeContext` 和 +`setRuntimeContext`。这些方法对于参数化函数 +(参阅 [给函数传递参数]({% link dev/batch/index.zh.md %}#passing-parameters-to-functions)), +创建和最终确定本地状态,访问广播变量(参阅 +[广播变量]({% link dev/batch/index.zh.md %}#broadcast-variables )),以及访问运行时信息,例如累加器和计数器(参阅 +[累加器和计数器](#accumulators--counters)),以及迭代器的相关信息(参阅 [迭代器]({% link dev/batch/iterations.zh.md %})) +有很大作用。 {% top %} -## Accumulators & Counters + -Accumulators are simple constructs with an **add operation** and a **final accumulated result**, -which is available after the job ended. +## 累加器和计数器 -The most straightforward accumulator is a **counter**: You can increment it using the -```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial -results and send the result to the client. Accumulators are useful during debugging or if you -quickly want to find out more about your data. +累加器是具有**加法运算**和**最终累加结果**的一种简单结构,可在作业结束后使用。 -Flink currently has the following **built-in accumulators**. Each of them implements the -{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %} -interface. +最简单的累加器就是**计数器**: 你可以使用 +```Accumulator.add(V value)``` 方法将其递增。在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端。 +在调试过程中或在你想快速了解有关数据更多信息时,累加器作用很大。 + +Flink 目前有如下**内置累加器**。每个都实现了 +{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "累加器" %} +接口。 - {% gh_link
[GitHub] [flink] sujun1020 commented on pull request #13306: [FLINK-17779][Connectors/ORC]Orc file format support filter push down
sujun1020 commented on pull request #13306: URL: https://github.com/apache/flink/pull/13306#issuecomment-689960361 @JingsongLi Do you have time to review this PR again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai commented on a change in pull request #143: [FLINK-19017] Add logging and metrics for remote function invocations
tzulitai commented on a change in pull request #143: URL: https://github.com/apache/flink-statefun/pull/143#discussion_r486045078 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/RemoteInvocationMetrics.java ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.metrics; + +public interface RemoteInvocationMetrics { + + void remoteInvocationFailures(); + + void remoteInvocationLatency(long elapsed); Review comment: I think throughout the project, for time/duration-related parameters, the convention was to use Java's `Duration` class. What do you think about using that here as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
flinkbot edited a comment on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193 ## CI report: * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN * f1afb0e01b22163360d466c6fa3bdd09da42d46c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6392) * cb369a8127cc00759e82f2fe58e06737a5eadb7e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13352: [FLINK-19092][sql-parser] Parse comment on computed column failed
danny0405 commented on a change in pull request #13352: URL: https://github.com/apache/flink/pull/13352#discussion_r486044814 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java ## @@ -65,34 +67,55 @@ public SqlTableColumn(SqlIdentifier name, this.comment = comment; } + public SqlTableColumn(SqlIdentifier name, + SqlNode expr, + @Nullable SqlCharStringLiteral comment, + SqlParserPos pos) { + super(pos); + this.name = requireNonNull(name, "Column name should not be null"); + this.expr = requireNonNull(expr, "Column expression should not be null"); + this.comment = comment; + } + @Override public SqlOperator getOperator() { return OPERATOR; } @Override public List getOperandList() { - return ImmutableNullableList.of(name, type, comment); + return isComputed() ? + ImmutableNullableList.of(name, expr, comment) : + ImmutableNullableList.of(name, type, comment); } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { this.name.unparse(writer, leftPrec, rightPrec); - writer.print(" "); - this.type.unparse(writer, leftPrec, rightPrec); - if (!this.type.getNullable()) { - // Default is nullable. - writer.keyword("NOT NULL"); - } - if (this.constraint != null) { - this.constraint.unparse(writer, leftPrec, rightPrec); + if (isComputed()) { + writer.keyword("AS"); + this.expr.unparse(writer, leftPrec, rightPrec); + } else { + writer.print(" "); + this.type.unparse(writer, leftPrec, rightPrec); + if (!this.type.getNullable()) { + // Default is nullable. + writer.keyword("NOT NULL"); + } + if (this.constraint != null) { + this.constraint.unparse(writer, leftPrec, rightPrec); + } } if (this.comment != null) { writer.print(" COMMENT "); this.comment.unparse(writer, leftPrec, rightPrec); } } + public boolean isComputed() { + return type == null && expr != null; + } Review comment: - Can be `isGenerated` to be synced with `TableColumn` - `return expr != null;` is enough This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13352: [FLINK-19092][sql-parser] Parse comment on computed column failed
danny0405 commented on a change in pull request #13352: URL: https://github.com/apache/flink/pull/13352#discussion_r486043656 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ## @@ -236,12 +232,12 @@ public String getColumnSqlString() { writer.startList("", ""); for (SqlNode column : columnList) { writer.sep(","); - if (column instanceof SqlTableColumn) { - SqlTableColumn tableColumn = (SqlTableColumn) column; - tableColumn.getName().unparse(writer, 0, 0); - } else { - column.unparse(writer, 0, 0); + SqlTableColumn tableColumn = (SqlTableColumn) column; + if (tableColumn.isComputed()) { + tableColumn.getExpr().unparse(writer, 0, 0); Review comment: We can encapsulate all the unparse logic in `SqlTableColumn`. E.G. the code can be as simple as: ```java column.unparse(writer, 0, 0) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13276: [FLINK-18604][connectors/HBase] HBase ConnectorDescriptor can not work in Table API
flinkbot edited a comment on pull request #13276: URL: https://github.com/apache/flink/pull/13276#issuecomment-682412809 ## CI report: * 1c35e543e110cd37dea22c6d7ee47f26951c58d5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6393) * bda0e67535a7df0972ab2507867e440f4ac51022 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6414) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ThunderSuuuuu closed pull request #11706: [FLINK-17079][CsvTableSinkFactoryBase] add numFiles and writeMode config by user's config in CsvTableSinkFactoryBase when create CsvTableSink
ThunderSu closed pull request #11706: URL: https://github.com/apache/flink/pull/11706 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Shawn-Hx commented on pull request #13308: [hotfix][docs-zh] Fix invalid links in "Concepts & Common API" page of "Table API & SQL"
Shawn-Hx commented on pull request #13308: URL: https://github.com/apache/flink/pull/13308#issuecomment-689947467 kindly ping @klion26 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13327: [FLINK-19134][python] Add BasicArrayTypeInfo and coder for PrimitiveArrayTypeInfo for Python DataStream API.
flinkbot edited a comment on pull request #13327: URL: https://github.com/apache/flink/pull/13327#issuecomment-687006937 ## CI report: * 20561ace012e47a46125b1903ae0ffccd496ed9a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6396) * 018477dfaeb5697e3e24ddd04884535e3e3a3d8a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6413) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13276: [FLINK-18604][connectors/HBase] HBase ConnectorDescriptor can not work in Table API
flinkbot edited a comment on pull request #13276: URL: https://github.com/apache/flink/pull/13276#issuecomment-682412809 ## CI report: * 1c35e543e110cd37dea22c6d7ee47f26951c58d5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6393) * bda0e67535a7df0972ab2507867e440f4ac51022 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13003: [FLINK-18737][docs]translate jdbc connector
flinkbot edited a comment on pull request #13003: URL: https://github.com/apache/flink/pull/13003#issuecomment-664844064 ## CI report: * 565d353b41557312917ef867210bb731dd972fe7 UNKNOWN * 148cef0724df6b33f6bae78619f42a3dce360ccd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6042) * 5814af2fbd77b09bcd257ec8832d093e9a046098 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6412) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13214: [FLINK-18938][tableSQL/API] Throw better exception message for quering sink-only connector
flinkbot edited a comment on pull request #13214: URL: https://github.com/apache/flink/pull/13214#issuecomment-678119788 ## CI report: * 8c8fcc0f241851849e0b3faefa88aea5d6649662 UNKNOWN * 734bb7cb193a2d5067a957ce2def7a926fe09589 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6375) * aa78f74d8efbf509e2d9247d43cd68ff8e9db0b0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-17137) Support mini batch for WindowOperator in blink planner
[ https://issues.apache.org/jira/browse/FLINK-17137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193316#comment-17193316 ] tartarus commented on FLINK-17137: -- [~libenchao] hello, Is this to reduce state access to improve performance? It seems that the benefits will be more obvious under rocksdb. > Support mini batch for WindowOperator in blink planner > -- > > Key: FLINK-17137 > URL: https://issues.apache.org/jira/browse/FLINK-17137 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Benchao Li >Priority: Major > > Currently only regular aggregate and deduplicate support mini batch. > WindowOperator is a very frequently used operator in Flink, it's very helpful > to support mini batch for it. > Design document: > https://docs.google.com/document/d/1GYlrg8dkYcw5fuq1HptdA3lygXrS_VRbnI8NXoEtZCg/edit?usp=sharing > cc [~jark] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13327: [FLINK-19134][python] Add BasicArrayTypeInfo and coder for PrimitiveArrayTypeInfo for Python DataStream API.
flinkbot edited a comment on pull request #13327: URL: https://github.com/apache/flink/pull/13327#issuecomment-687006937 ## CI report: * 20561ace012e47a46125b1903ae0ffccd496ed9a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6396) * 018477dfaeb5697e3e24ddd04884535e3e3a3d8a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shengjk commented on pull request #13118: [FLINK-18729][Connectors / Kafka] Make flink streaming kafka producer auto discovery partition
shengjk commented on pull request #13118: URL: https://github.com/apache/flink/pull/13118#issuecomment-689938111 Is there any progress on this PR ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19181) Make python processes respect the calculated managed memory fraction
[ https://issues.apache.org/jira/browse/FLINK-19181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19181: - Component/s: API / Python > Make python processes respect the calculated managed memory fraction > > > Key: FLINK-19181 > URL: https://issues.apache.org/jira/browse/FLINK-19181 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Xintong Song >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19181) Make python processes respect the calculated managed memory fraction
[ https://issues.apache.org/jira/browse/FLINK-19181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19181: - Fix Version/s: 1.12.0 > Make python processes respect the calculated managed memory fraction > > > Key: FLINK-19181 > URL: https://issues.apache.org/jira/browse/FLINK-19181 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Xintong Song >Priority: Major > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19182) Update document for intra-slot managed memory sharing
[ https://issues.apache.org/jira/browse/FLINK-19182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19182: - Component/s: Documentation > Update document for intra-slot managed memory sharing > - > > Key: FLINK-19182 > URL: https://issues.apache.org/jira/browse/FLINK-19182 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Xintong Song >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19182) Update document for intra-slot managed memory sharing
[ https://issues.apache.org/jira/browse/FLINK-19182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19182: - Fix Version/s: 1.12.0 > Update document for intra-slot managed memory sharing > - > > Key: FLINK-19182 > URL: https://issues.apache.org/jira/browse/FLINK-19182 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Xintong Song >Priority: Major > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19180) Make RocksDB respect the calculated managed memory fraction
[ https://issues.apache.org/jira/browse/FLINK-19180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19180: - Component/s: Runtime / State Backends > Make RocksDB respect the calculated managed memory fraction > --- > > Key: FLINK-19180 > URL: https://issues.apache.org/jira/browse/FLINK-19180 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Xintong Song >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19180) Make RocksDB respect the calculated managed memory fraction
[ https://issues.apache.org/jira/browse/FLINK-19180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19180: - Fix Version/s: 1.12.0 > Make RocksDB respect the calculated managed memory fraction > --- > > Key: FLINK-19180 > URL: https://issues.apache.org/jira/browse/FLINK-19180 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Xintong Song >Priority: Major > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19178) Introduce the memory weights configuration option
[ https://issues.apache.org/jira/browse/FLINK-19178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19178: - Component/s: Runtime / Configuration > Introduce the memory weights configuration option > - > > Key: FLINK-19178 > URL: https://issues.apache.org/jira/browse/FLINK-19178 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Xintong Song >Priority: Major > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19179) Implement the managed memory fraction calculation logic
[ https://issues.apache.org/jira/browse/FLINK-19179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19179: - Component/s: Runtime / Coordination > Implement the managed memory fraction calculation logic > --- > > Key: FLINK-19179 > URL: https://issues.apache.org/jira/browse/FLINK-19179 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Xintong Song >Priority: Major > > This also means migrating the batch operator use cases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-19178) Introduce the memory weights configuration option
[ https://issues.apache.org/jira/browse/FLINK-19178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-19178: Assignee: Xintong Song > Introduce the memory weights configuration option > - > > Key: FLINK-19178 > URL: https://issues.apache.org/jira/browse/FLINK-19178 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Xintong Song >Assignee: Xintong Song >Priority: Major > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19179) Implement the managed memory fraction calculation logic
[ https://issues.apache.org/jira/browse/FLINK-19179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19179: - Fix Version/s: 1.12.0 > Implement the managed memory fraction calculation logic > --- > > Key: FLINK-19179 > URL: https://issues.apache.org/jira/browse/FLINK-19179 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Xintong Song >Priority: Major > Fix For: 1.12.0 > > > This also means migrating the batch operator use cases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19178) Introduce the memory weights configuration option
[ https://issues.apache.org/jira/browse/FLINK-19178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19178: - Fix Version/s: 1.12.0 > Introduce the memory weights configuration option > - > > Key: FLINK-19178 > URL: https://issues.apache.org/jira/browse/FLINK-19178 > Project: Flink > Issue Type: Sub-task >Reporter: Xintong Song >Priority: Major > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19181) Make python processes respect the calculated managed memory fraction
[ https://issues.apache.org/jira/browse/FLINK-19181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19181: - Summary: Make python processes respect the calculated managed memory fraction (was: Make python processes respect the calculated fraction) > Make python processes respect the calculated managed memory fraction > > > Key: FLINK-19181 > URL: https://issues.apache.org/jira/browse/FLINK-19181 > Project: Flink > Issue Type: Sub-task >Reporter: Xintong Song >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19179) Implement the managed memory fraction calculation logic
[ https://issues.apache.org/jira/browse/FLINK-19179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19179: - Summary: Implement the managed memory fraction calculation logic (was: Implement the new fraction calculation logic) > Implement the managed memory fraction calculation logic > --- > > Key: FLINK-19179 > URL: https://issues.apache.org/jira/browse/FLINK-19179 > Project: Flink > Issue Type: Sub-task >Reporter: Xintong Song >Priority: Major > > This also means migrating the batch operator use cases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19180) Make RocksDB respect the calculated managed memory fraction
[ https://issues.apache.org/jira/browse/FLINK-19180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-19180: - Summary: Make RocksDB respect the calculated managed memory fraction (was: Make RocksDB respect the calculated fraction) > Make RocksDB respect the calculated managed memory fraction > --- > > Key: FLINK-19180 > URL: https://issues.apache.org/jira/browse/FLINK-19180 > Project: Flink > Issue Type: Sub-task >Reporter: Xintong Song >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19181) Make python processes respect the calculated fraction
Xintong Song created FLINK-19181: Summary: Make python processes respect the calculated fraction Key: FLINK-19181 URL: https://issues.apache.org/jira/browse/FLINK-19181 Project: Flink Issue Type: Sub-task Reporter: Xintong Song -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19182) Update document for intra-slot managed memory sharing
Xintong Song created FLINK-19182: Summary: Update document for intra-slot managed memory sharing Key: FLINK-19182 URL: https://issues.apache.org/jira/browse/FLINK-19182 Project: Flink Issue Type: Sub-task Reporter: Xintong Song -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19180) Make RocksDB respect the calculated fraction
Xintong Song created FLINK-19180: Summary: Make RocksDB respect the calculated fraction Key: FLINK-19180 URL: https://issues.apache.org/jira/browse/FLINK-19180 Project: Flink Issue Type: Sub-task Reporter: Xintong Song -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19179) Implement the new fraction calculation logic
Xintong Song created FLINK-19179: Summary: Implement the new fraction calculation logic Key: FLINK-19179 URL: https://issues.apache.org/jira/browse/FLINK-19179 Project: Flink Issue Type: Sub-task Reporter: Xintong Song This also means migrating the batch operator use cases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19178) Introduce the memory weights configuration option
Xintong Song created FLINK-19178: Summary: Introduce the memory weights configuration option Key: FLINK-19178 URL: https://issues.apache.org/jira/browse/FLINK-19178 Project: Flink Issue Type: Sub-task Reporter: Xintong Song -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193306#comment-17193306 ] tinny cat commented on FLINK-19167: --- the watermark is: {code:java} stream .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { private long currentMaxTimestamp = 0L; private long maxOutOfOrderness = 1L; private Watermark watermark = null; @Override public long extractTimestamp(UserAction element, long previousElementTimestamp) { long timestamp = element.viewTime; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); } @Nullable @Override public Watermark getCurrentWatermark() { watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness); return watermark; } }) {code} > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19167) Proccess Function Example could not work
[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193305#comment-17193305 ] tinny cat commented on FLINK-19167: --- however, I set the `TimeCharacteristic` was event time, and assign the watermark as event time。 The conclusion I got is: If ctx.timestamp() is assigned to `current.lastModified`, This code as follow will be never execute: {code:java} if (timestamp == result.lastModified + 6) { // emit the state on timeout out.collect(new Tuple2(result.key, result.count)); } {code} because, `timestamp` always equals result.lastModified > Proccess Function Example could not work > > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.1 >Reporter: tinny cat >Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2 value, > Context ctx, > Collector> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 6); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 6) { > // emit the state on timeout > out.collect(new Tuple2(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 6` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19177) FLIP-141: Intra-Slot Managed Memory Sharing
Xintong Song created FLINK-19177: Summary: FLIP-141: Intra-Slot Managed Memory Sharing Key: FLINK-19177 URL: https://issues.apache.org/jira/browse/FLINK-19177 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Xintong Song Assignee: Xintong Song Fix For: 1.12.0 This is the umbrella ticket of [FLIP-141: Intra-Slot Managed Memory Sharing|https://cwiki.apache.org/confluence/display/FLINK/FLIP-141%3A+Intra-Slot+Managed+Memory+Sharing]. [FLIP-53|https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management] introduced a fraction based approach for sharing managed memory within a slot. This approach needs to be extended as python operators, which also use managed memory, are introduced. This FLIP proposes a design for extending intra-slot managed memory sharing for python operators and other potential future managed memory use cases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13003: [FLINK-18737][docs]translate jdbc connector
flinkbot edited a comment on pull request #13003: URL: https://github.com/apache/flink/pull/13003#issuecomment-664844064 ## CI report: * 565d353b41557312917ef867210bb731dd972fe7 UNKNOWN * 148cef0724df6b33f6bae78619f42a3dce360ccd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6042) * 5814af2fbd77b09bcd257ec8832d093e9a046098 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liming30 commented on pull request #13109: [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.
liming30 commented on pull request #13109: URL: https://github.com/apache/flink/pull/13109#issuecomment-689927793 Okay, I understand. I will continue to pay attention to the progress of this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] weizheng92 commented on pull request #13003: [FLINK-18737][docs]translate jdbc connector
weizheng92 commented on pull request #13003: URL: https://github.com/apache/flink/pull/13003#issuecomment-689924443 @XBaith Wu could plz have a review again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-19171) K8s Resource Manager may lead to resource leak after pod deleted
[ https://issues.apache.org/jira/browse/FLINK-19171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193285#comment-17193285 ] Yi Tang edited comment on FLINK-19171 at 9/10/20, 1:27 AM: --- We can fix it by fetching the pod again before checking pod status. And if it doesn't exist, we can do the following terminated logic without stopping it. Think i can fix this issue. Can anyone help to confirm it? was (Author: yittg): We can fix it by fetching the pod again before checking pod status. And if it doesn't exist, we can do the following terminated logic without stopping it. > K8s Resource Manager may lead to resource leak after pod deleted > > > Key: FLINK-19171 > URL: https://issues.apache.org/jira/browse/FLINK-19171 > Project: Flink > Issue Type: Bug >Reporter: Yi Tang >Priority: Minor > > {code:java} > private void terminatedPodsInMainThread(List pods) { >getMainThreadExecutor().execute(() -> { > for (KubernetesPod pod : pods) { > if (pod.isTerminated()) { > ... > } > } >}); > } > {code} > Looks like that the RM only remove the pod from ledger if the pod > "isTerminated", > and the pod has been taken accounted after being created. > However, it is not complete by checking pod "isTerminated", e.g. a Pending > pod is deleted manually. > After that, a new job requires more resource can not trigger the allocation > of a new pod. > > Pls let me know if i misunderstand, thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19171) K8s Resource Manager may lead to resource leak after pod deleted
[ https://issues.apache.org/jira/browse/FLINK-19171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193285#comment-17193285 ] Yi Tang commented on FLINK-19171: - We can fix it by fetching the pod again before checking pod status. And if it doesn't exist, we can do the following terminated logic without stopping it. > K8s Resource Manager may lead to resource leak after pod deleted > > > Key: FLINK-19171 > URL: https://issues.apache.org/jira/browse/FLINK-19171 > Project: Flink > Issue Type: Bug >Reporter: Yi Tang >Priority: Minor > > {code:java} > private void terminatedPodsInMainThread(List pods) { >getMainThreadExecutor().execute(() -> { > for (KubernetesPod pod : pods) { > if (pod.isTerminated()) { > ... > } > } >}); > } > {code} > Looks like that the RM only remove the pod from ledger if the pod > "isTerminated", > and the pod has been taken accounted after being created. > However, it is not complete by checking pod "isTerminated", e.g. a Pending > pod is deleted manually. > After that, a new job requires more resource can not trigger the allocation > of a new pod. > > Pls let me know if i misunderstand, thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17274) Maven: Premature end of Content-Length delimited message body
[ https://issues.apache.org/jira/browse/FLINK-17274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193284#comment-17193284 ] Dian Fu commented on FLINK-17274: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6409=logs=d47ab8d2-10c7-5d9e-8178-ef06a797a0d8=dbd54e26-95e0-584b-4a47-190a8df6e3ac > Maven: Premature end of Content-Length delimited message body > - > > Key: FLINK-17274 > URL: https://issues.apache.org/jira/browse/FLINK-17274 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > CI: > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7786=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb > {code} > [ERROR] Failed to execute goal on project > flink-connector-elasticsearch7_2.11: Could not resolve dependencies for > project > org.apache.flink:flink-connector-elasticsearch7_2.11:jar:1.11-SNAPSHOT: Could > not transfer artifact org.apache.lucene:lucene-sandbox:jar:8.3.0 from/to > alicloud-mvn-mirror > (http://mavenmirror.alicloud.dak8s.net:/repository/maven-central/): GET > request of: org/apache/lucene/lucene-sandbox/8.3.0/lucene-sandbox-8.3.0.jar > from alicloud-mvn-mirror failed: Premature end of Content-Length delimited > message body (expected: 289920; received: 239832 -> [Help 1] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18117) "Kerberized YARN per-job on Docker test" fails with "Could not start hadoop cluster."
[ https://issues.apache.org/jira/browse/FLINK-18117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193282#comment-17193282 ] Dian Fu commented on FLINK-18117: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6395=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > "Kerberized YARN per-job on Docker test" fails with "Could not start hadoop > cluster." > - > > Key: FLINK-18117 > URL: https://issues.apache.org/jira/browse/FLINK-18117 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.0, 1.12.0 >Reporter: Robert Metzger >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2683=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5 > {code} > 2020-06-04T06:03:53.2844296Z Creating slave1 ... [32mdone[0m > 2020-06-04T06:03:53.4981251Z [1BWaiting for hadoop cluster to come up. We > have been trying for 0 seconds, retrying ... > 2020-06-04T06:03:58.5980181Z Waiting for hadoop cluster to come up. We have > been trying for 5 seconds, retrying ... > 2020-06-04T06:04:03.6997087Z Waiting for hadoop cluster to come up. We have > been trying for 10 seconds, retrying ... > 2020-06-04T06:04:08.7910791Z Waiting for hadoop cluster to come up. We have > been trying for 15 seconds, retrying ... > 2020-06-04T06:04:13.8921621Z Waiting for hadoop cluster to come up. We have > been trying for 20 seconds, retrying ... > 2020-06-04T06:04:18.9648844Z Waiting for hadoop cluster to come up. We have > been trying for 25 seconds, retrying ... > 2020-06-04T06:04:24.0381851Z Waiting for hadoop cluster to come up. We have > been trying for 31 seconds, retrying ... > 2020-06-04T06:04:29.1220264Z Waiting for hadoop cluster to come up. We have > been trying for 36 seconds, retrying ... > 2020-06-04T06:04:34.1882187Z Waiting for hadoop cluster to come up. We have > been trying for 41 seconds, retrying ... > 2020-06-04T06:04:39.2784948Z Waiting for hadoop cluster to come up. We have > been trying for 46 seconds, retrying ... > 2020-06-04T06:04:44.3843337Z Waiting for hadoop cluster to come up. We have > been trying for 51 seconds, retrying ... > 2020-06-04T06:04:49.4703561Z Waiting for hadoop cluster to come up. We have > been trying for 56 seconds, retrying ... > 2020-06-04T06:04:54.5463207Z Waiting for hadoop cluster to come up. We have > been trying for 61 seconds, retrying ... > 2020-06-04T06:04:59.6650405Z Waiting for hadoop cluster to come up. We have > been trying for 66 seconds, retrying ... > 2020-06-04T06:05:04.7500168Z Waiting for hadoop cluster to come up. We have > been trying for 71 seconds, retrying ... > 2020-06-04T06:05:09.8177904Z Waiting for hadoop cluster to come up. We have > been trying for 76 seconds, retrying ... > 2020-06-04T06:05:14.9751297Z Waiting for hadoop cluster to come up. We have > been trying for 81 seconds, retrying ... > 2020-06-04T06:05:20.0336417Z Waiting for hadoop cluster to come up. We have > been trying for 87 seconds, retrying ... > 2020-06-04T06:05:25.1627704Z Waiting for hadoop cluster to come up. We have > been trying for 92 seconds, retrying ... > 2020-06-04T06:05:30.2583315Z Waiting for hadoop cluster to come up. We have > been trying for 97 seconds, retrying ... > 2020-06-04T06:05:35.3283678Z Waiting for hadoop cluster to come up. We have > been trying for 102 seconds, retrying ... > 2020-06-04T06:05:40.4184029Z Waiting for hadoop cluster to come up. We have > been trying for 107 seconds, retrying ... > 2020-06-04T06:05:45.5388372Z Waiting for hadoop cluster to come up. We have > been trying for 112 seconds, retrying ... > 2020-06-04T06:05:50.6155334Z Waiting for hadoop cluster to come up. We have > been trying for 117 seconds, retrying ... > 2020-06-04T06:05:55.7225186Z Command: start_hadoop_cluster failed. Retrying... > 2020-06-04T06:05:55.7237999Z Starting Hadoop cluster > 2020-06-04T06:05:56.5188293Z kdc is up-to-date > 2020-06-04T06:05:56.5292716Z master is up-to-date > 2020-06-04T06:05:56.5301735Z slave2 is up-to-date > 2020-06-04T06:05:56.5306179Z slave1 is up-to-date > 2020-06-04T06:05:56.6800566Z Waiting for hadoop cluster to come up. We have > been trying for 0 seconds, retrying ... > 2020-06-04T06:06:01.7668291Z Waiting for hadoop cluster to come up. We have > been trying for 5 seconds, retrying ... > 2020-06-04T06:06:06.8620265Z Waiting for hadoop cluster to come up. We have > been trying for 10 seconds, retrying ... > 2020-06-04T06:06:11.9753596Z Waiting for hadoop cluster to come up. We have > been trying for 15 seconds, retrying ... > 2020-06-04T06:06:17.0402846Z Waiting for hadoop cluster to come up. We have > been
[GitHub] [flink] crazyzhou commented on pull request #13303: [FLINK-19098][format] Make row data converters public
crazyzhou commented on pull request #13303: URL: https://github.com/apache/flink/pull/13303#issuecomment-689910720 @wuchong @danny0405 Can you help review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-playgrounds] morsapaes commented on a change in pull request #16: [FLINK-19145][walkthroughs] Add PyFlink-walkthrough to Flink playground.
morsapaes commented on a change in pull request #16: URL: https://github.com/apache/flink-playgrounds/pull/16#discussion_r485975110 ## File path: pyflink-walkthrough/docker-compose.yml ## @@ -0,0 +1,96 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +version: '2.1' +services: + jobmanager: +build: . +image: pyflink/pyflink:1.11.0-scala_2.11 +volumes: + - .:/opt/pyflink-walkthrough +hostname: "jobmanager" +expose: + - "6123" +ports: + - "8081:8081" +command: jobmanager +environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager + taskmanager: +image: pyflink/pyflink:1.11.0-scala_2.11 +volumes: +- .:/opt/pyflink-walkthrough +expose: + - "6121" + - "6122" +depends_on: + - jobmanager +command: taskmanager +links: + - jobmanager:jobmanager +environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager + zookeeper: +image: wurstmeister/zookeeper:3.4.6 +ports: + - "2181:2181" + kafka: +image: wurstmeister/kafka:2.12-2.2.1 +ports: + - "9092" +depends_on: + - zookeeper +environment: + HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_CREATE_TOPICS: "payment_msg:1:1" +volumes: + - /var/run/docker.sock:/var/run/docker.sock + generator: +build: generator +image: generator:1.0 +depends_on: + - kafka + elasticsearch: +image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0 +environment: + - cluster.name=docker-cluster + - bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node +ports: + - "9200:9200" + - "9300:9300" +ulimits: + memlock: +soft: -1 +hard: -1 + nofile: +soft: 65536 +hard: 65536 + kibana: +image: docker.elastic.co/kibana/kibana:7.8.0 +ports: + - "5601:5601" +depends_on: + - elasticsearch + load-kibaba-dashboad: Review comment: ```suggestion load-kibana-dashboard: ``` ## File path: pyflink-walkthrough/README.md ## @@ -0,0 +1,102 @@ +# pyflink-walkthrough + +## Background + +In this playground, you will learn how to manage and run PyFlink Jobs. The pipeline of this walkthrough reads data from Kafka, performs aggregations and writes results to Elasticsearch visualized via Kibana. The environment is managed by Docker so that all you need is a docker on your computer. + +- Kafka + +Kafka is used to store input data in this walkthrough. The script [generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py) is used to generate transaction data and writes into the payment_msg kafka topic. Each record includes 5 fields: +```text +{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 28306.44976403719, "payPlatform": 0, "provinceId": 4} +``` +```text +createTime: The creation time of the transaction. +orderId: The id of the current transaction. +payAmount: The pay amount of the current transaction. +payPlatform: The platform used to pay the order, pc or mobile. +provinceId: The id of the province for the user. +``` + +- Generator + +A simple data generator is provided that continuously writes new records into Kafka. +You can use the following command to read data in kafka and check whether the data is generated correctly. + +```shell script +$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic payment_msg +{"createTime":"2020-07-27 09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3} +{"createTime":"2020-07-27 09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3} +{"createTime":"2020-07-27 09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0} +{"createTime":"2020-07-27
[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193217#comment-17193217 ] Galen Warren commented on FLINK-19176: -- Sorry for the edits – I'm trying to get hyperlinks to appear correctly. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated classes as message > types in Stateful Functions directly as message types, without having to > convert to/from code generated for Java and/or raw byte[] messages. > This would be a simple implementation, as there is already a > [MessagePayloadSerializer > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] > interface that is implemented for each of the existing serialization > methods; supporting ScalaPB would require a new class implementing > MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to > select it, by adding a new value to the [MessageFactoryType > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. > Plus testing :) > I've done this already locally, the implementation is very similar to the > existing [MessagePayloadSerializerPb > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. > It uses a reference to ScalaPB in "provided" scope. > Would you be interested in a pull request to add this? Primary benefit is to > make it easy to use Stateful Functions in a Scala environment. Thanks. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galen Warren updated FLINK-19176: - Description: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. was: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated
[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galen Warren updated FLINK-19176: - Description: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. was: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). > This would allow Scala developers to use ScalaPB-generated
[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galen Warren updated FLINK-19176: - Description: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. was: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB|[https://scalapb.github.io/docs/]]. This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). > This would allow Scala developers to use ScalaPB-generated
[jira] [Created] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
Galen Warren created FLINK-19176: Summary: Support ScalaPB as a message payload serializer in Stateful Functions Key: FLINK-19176 URL: https://issues.apache.org/jira/browse/FLINK-19176 Project: Flink Issue Type: Improvement Components: Stateful Functions Affects Versions: 2.0.0 Reporter: Galen Warren Fix For: statefun-2.1.0 Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB|[https://scalapb.github.io/docs/]]. This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13367: [FLINK-17818][flink-java] Fixed the CSV Reader API: CSV Reader was not parsing data when no field names are passed as an argument. Ad
flinkbot edited a comment on pull request #13367: URL: https://github.com/apache/flink/pull/13367#issuecomment-689643831 ## CI report: * 817abe90ee59cb5896cf7681d47658a162758f58 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6405) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13352: [FLINK-19092][sql-parser] Parse comment on computed column failed
flinkbot edited a comment on pull request #13352: URL: https://github.com/apache/flink/pull/13352#issuecomment-688773679 ## CI report: * 800d23e1a95bee277aa2dce51c0625901fae92ca Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6403) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13366: [FLINK-17393][connector/common] Wakeup the SplitFetchers more elegantly.
flinkbot edited a comment on pull request #13366: URL: https://github.com/apache/flink/pull/13366#issuecomment-689608158 ## CI report: * 38b675997be828d79dc2b87bf195bf8962f1150e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6404) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
flinkbot edited a comment on pull request #13010: URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560 ## CI report: * a96948229811ec6f8b72a2fd2ad019a9fa6b2d67 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6407) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13303: [FLINK-19098][format] Make row data converters public
flinkbot edited a comment on pull request #13303: URL: https://github.com/apache/flink/pull/13303#issuecomment-685551684 ## CI report: * 9e3523a26f23656c5d25be589b295136e20f6f48 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6399) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
flinkbot edited a comment on pull request #13010: URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560 ## CI report: * 125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416) * a96948229811ec6f8b72a2fd2ad019a9fa6b2d67 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
sjwiesman commented on pull request #13010: URL: https://github.com/apache/flink/pull/13010#issuecomment-689714414 @JingsongLi thank you for the thorough. Review. I've corrected the types and updated the test to validate them. If you don't have any other comments I'll merge on green. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13179: [FLINK-18978][state-backends] Support full table scan of key and namespace from statebackend
flinkbot edited a comment on pull request #13179: URL: https://github.com/apache/flink/pull/13179#issuecomment-674957571 ## CI report: * b985e226543345724377647b73884a2527ac5b30 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6406) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
sjwiesman commented on a change in pull request #13010: URL: https://github.com/apache/flink/pull/13010#discussion_r485789362 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.datagen; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.ZonedTimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; + +import java.io.Serializable; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.function.Supplier; + +/** + * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s. + */ +public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor { + + protected final String name; + + protected final ReadableConfig config; + + protected DataGenVisitorBase(String name, ReadableConfig config) { + this.name = name; + this.config = config; + } + + @Override + public DataGeneratorContainer visit(DateType dateType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now)); + } + + @Override + public DataGeneratorContainer visit(TimeType timeType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now)); + } + + @Override + public DataGeneratorContainer visit(TimestampType timestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now)); + } + + @Override + public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now)); + } + + @Override + public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) { + return DataGeneratorContainer.of(TimeGenerator.of(Instant::now)); + } + + @Override + protected DataGeneratorContainer defaultMethod(LogicalType logicalType) { + throw new ValidationException("Unsupported type: " + logicalType); + } + + private interface SerializableSupplier extends Supplier, Serializable { } Review comment: Its not, the source gets caught in the closure cleaner without this. ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package
[GitHub] [flink] sjwiesman commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
sjwiesman commented on a change in pull request #13010: URL: https://github.com/apache/flink/pull/13010#discussion_r485782974 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.datagen; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; +import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; +import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.datagen.types.RowDataGenerator; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A {@link StreamTableSource} that emits each number from a given interval exactly once, + * possibly in parallel. See {@link StatefulSequenceSource}. + */ +@Internal +public class DataGenTableSource implements ScanTableSource { + + private final DataGenerator[] fieldGenerators; + private final String tableName; + private final TableSchema schema; + private final long rowsPerSecond; + private final Long numberOfRows; + + public DataGenTableSource( + DataGenerator[] fieldGenerators, + String tableName, + TableSchema schema, + long rowsPerSecond, + Long numberOfRows) { + this.fieldGenerators = fieldGenerators; + this.tableName = tableName; + this.schema = schema; + this.rowsPerSecond = rowsPerSecond; + this.numberOfRows = numberOfRows; + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { + boolean isBounded = numberOfRows == null; Review comment: nice catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source
sjwiesman commented on a change in pull request #13010: URL: https://github.com/apache/flink/pull/13010#discussion_r485779807 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java ## @@ -45,18 +62,34 @@ * @param generator data generator. */ public DataGeneratorSource(DataGenerator generator) { - this(generator, Long.MAX_VALUE); + this(generator, "generator", Long.MAX_VALUE, null); } /** * Creates a source that emits records by {@link DataGenerator}. * * @param generator data generator. * @param rowsPerSecond Control the emit rate. +* @param numberOfRows Total number of rows to output. */ - public DataGeneratorSource(DataGenerator generator, long rowsPerSecond) { + public DataGeneratorSource(DataGenerator generator, String name, long rowsPerSecond, Long numberOfRows) { this.generator = generator; + this.name = name; Review comment: that's a remnant from an old commit, will remove. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org